Source code for dials.algorithms.scaling.outlier_rejection

"""
Definitions of outlier rejection algorithms.

These algorithms use the Ih_table datastructures to perform calculations
in groups of symmetry equivalent reflections. Two functions are provided,
reject_outliers, to reject outlier and set flags given a reflection table
and experiment object, and determine_outlier_index_arrays, which takes an
Ih_table and returns flex.size_t index arrays of the outlier positions.
"""

import copy
import logging

import numpy as np

from dxtbx import flumpy
from scitbx.array_family import flex

from dials.algorithms.scaling.Ih_table import IhTable
from dials.util.normalisation import quasi_normalisation
from dials_scaling_ext import determine_outlier_indices, limit_outlier_weights

logger = logging.getLogger("dials")


[docs]def reject_outliers(reflection_table, experiment, method="standard", zmax=6.0): """ Run an outlier algorithm on symmetry-equivalent intensities. This method runs an intensity-based outlier rejection algorithm, comparing the deviations from the weighted mean in groups of symmetry equivalent reflections. The outliers are determined and the outlier_in_scaling flag is set in the reflection table. The values intensity and variance must be set in the reflection table; these should be corrected but unscaled values, as an inverse_scale_factor will be applied during outlier rejection if this is present in the reflection table. The reflection table should also be prefiltered (e.g. not-integrated reflections should not be present) as no further filtering is done on the input table. Args: reflection_table: A reflection table. experiment: A single experiment object. method (str): Name (alias) of outlier rejection algorithm to use. zmax (float): Normalised deviation threshold for classifying an outlier. Returns: reflection_table: The input table with the outlier_in_scaling flag set. """ assert "intensity" in reflection_table, "reflection table has no 'intensity' column" assert "variance" in reflection_table, "reflection table has no 'variance' column" if "inverse_scale_factor" not in reflection_table: reflection_table["inverse_scale_factor"] = flex.double( reflection_table.size(), 1.0 ) Ih_table = IhTable( [reflection_table], experiment.crystal.get_space_group(), nblocks=1 ) outlier_indices = determine_outlier_index_arrays( Ih_table, method=method, zmax=zmax )[0] # Unset any existing outlier flags before setting the new ones reflection_table.unset_flags( reflection_table.get_flags(reflection_table.flags.outlier_in_scaling), reflection_table.flags.outlier_in_scaling, ) reflection_table.set_flags( flumpy.from_numpy(outlier_indices), reflection_table.flags.outlier_in_scaling ) return reflection_table
[docs]def determine_Esq_outlier_index_arrays(Ih_table, experiment, emax=10.0): # first calculate normalised intensities and set in the Ih_table. intensities = Ih_table.as_miller_array(experiment.crystal.get_unit_cell()) normalised_intensities = quasi_normalisation(intensities) sel = normalised_intensities.data() > (emax ** 2) n_e2_outliers = sel.count(True) if n_e2_outliers: logger.info( f"{n_e2_outliers} outliers identified from normalised intensity analysis (E\xb2 > {(emax ** 2)})" ) outlier_indices = ( Ih_table.Ih_table_blocks[0] .Ih_table["loc_indices"] .iloc[flumpy.to_numpy(sel)] .to_numpy() ) if Ih_table.n_datasets == 1: return [outlier_indices] datasets = ( Ih_table.Ih_table_blocks[0] .Ih_table["dataset_id"] .iloc[flumpy.to_numpy(sel)] .to_numpy() ) final_outlier_arrays = [] for i in range(Ih_table.n_datasets): final_outlier_arrays.append(outlier_indices[datasets == i]) return final_outlier_arrays
[docs]def determine_outlier_index_arrays(Ih_table, method="standard", zmax=6.0, target=None): """ Run an outlier algorithm and return the outlier indices. Args: Ih_table: A dials.algorithms.scaling.Ih_table.IhTable. method (str): Name (alias) of outlier rejection algorithm to use. If method=target, then the optional argument target must also be specified. Implemented methods; standard, simple, target. zmax (float): Normalised deviation threshold for classifying an outlier. target (Optional[IhTable]): An IhTable to use to obtain target Ih for outlier rejectiob, if method=target. Returns: outlier_index_arrays (list): A list of flex.size_t arrays, with one array per dataset that was used to create the Ih_table. Importantly, the indices are the indices of the reflections in the initial reflection table used to create the Ih_table, not the indices of the data in the Ih_table. Raises: ValueError: if an invalid choice is made for the method. """ outlier_rej = None if method == "standard": outlier_rej = NormDevOutlierRejection(Ih_table, zmax) elif method == "simple": outlier_rej = SimpleNormDevOutlierRejection(Ih_table, zmax) elif method == "target": assert target is not None outlier_rej = TargetedOutlierRejection(Ih_table, zmax, target) elif method is not None: raise ValueError(f"Invalid choice of outlier rejection method: {method}") if not outlier_rej: return [flex.size_t([]) for _ in range(Ih_table.n_datasets)] outlier_rej.run() outlier_index_arrays = outlier_rej.final_outlier_arrays if Ih_table.n_datasets > 1: msg = ( "Combined outlier rejection has been performed across multiple datasets, \n" ) else: msg = "A round of outlier rejection has been performed, \n" n_outliers = sum(len(i) for i in outlier_index_arrays) msg += f"{n_outliers} outliers have been identified. \n" logger.info(msg) return outlier_index_arrays
[docs]class OutlierRejectionBase: """ Base class for outlier rejection algorithms using an IhTable datastructure. Subclasses must implement the _do_outlier_rejection method, which must add the indices of outliers to the _outlier_indices attribute. The algorithms are run upon initialisation and result in the population of the :obj:`final_outlier_arrays`. Attributes: final_outlier_arrays (:obj:`list`): A list of flex.size_t arrays of outlier indices w.r.t. the order of the initial reflection tables used to create the Ih_table. """
[docs] def __init__(self, Ih_table, zmax): """Set up and run the outlier rejection algorithm.""" assert ( Ih_table.n_work_blocks == 1 ), """ Outlier rejection algorithms require an Ih_table with nblocks = 1""" # Note: could be possible to code for nblocks > 1 self._Ih_table_block = Ih_table.blocked_data_list[0] self._n_datasets = Ih_table.n_datasets self._block_selections = Ih_table.blocked_selection_list[0] self._datasets = np.array([], dtype=np.uint64).reshape((0,)) # flex.int([]) self._zmax = zmax self._outlier_indices = np.array([], dtype=np.uint64).reshape( (0,) ) # flex.size_t([]) self.final_outlier_arrays = None
[docs] def run(self): """Run the outlier rejection algorithm, implemented by a subclass.""" self._do_outlier_rejection() self.final_outlier_arrays = self._determine_outlier_indices()
def _determine_outlier_indices(self): """ Determine outlier indices with respect to the input reflection tables. Transform the outlier indices w.r.t the Ih_table, determined during the algorithm, to outlier indices w.r.t the initial reflection tables used to create the Ih_table, separated by reflection table. Returns: final_outlier_arrays (:obj:`list`): A list of flex.size_t arrays of outlier indices w.r.t. the order of the data in the initial reflection tables used to create the Ih_table. """ if self._n_datasets == 1: return [self._outlier_indices] final_outlier_arrays = [] for i in range(self._n_datasets): final_outlier_arrays.append(self._outlier_indices[self._datasets == i]) return final_outlier_arrays def _do_outlier_rejection(self): """Add indices (w.r.t. the Ih_table data) to self._outlier_indices.""" raise NotImplementedError()
[docs]class TargetedOutlierRejection(OutlierRejectionBase): """Implementation of an outlier rejection algorithm against a target. This algorithm requires a target Ih_table in addition to an Ih_table for the dataset under investigation. Normalised deviations are calculated from the intensity values in the target table. """
[docs] def __init__(self, Ih_table, zmax, target): """Set a target Ih_table and run the outlier rejection.""" assert ( target.n_work_blocks == 1 ), """ Targeted outlier rejection requires a target Ih_table with nblocks = 1""" self._target_Ih_table_block = target.blocked_data_list[0] self._target_Ih_table_block.calc_Ih() super().__init__(Ih_table, zmax)
def _do_outlier_rejection(self): """Add indices (w.r.t. the Ih_table data) to self._outlier_indices.""" Ih_table = self._Ih_table_block target = self._target_Ih_table_block target_asu_Ih_dict = dict( zip(target.asu_miller_index, zip(target.Ih_values, target.variances)) ) Ih_table.Ih_table["target_Ih_value"] = np.zeros(Ih_table.size) Ih_table.Ih_table["target_Ih_sigmasq"] = np.zeros(Ih_table.size) for j, miller_idx in enumerate(Ih_table.asu_miller_index): if miller_idx in target_asu_Ih_dict: Ih_table.Ih_table.loc[j, "target_Ih_value"] = target_asu_Ih_dict[ miller_idx ][0] Ih_table.Ih_table.loc[j, "target_Ih_sigmasq"] = target_asu_Ih_dict[ miller_idx ][1] nz_sel = Ih_table.Ih_table["target_Ih_value"].to_numpy() != 0.0 Ih_table = Ih_table.select(nz_sel) norm_dev = ( Ih_table.intensities - ( Ih_table.inverse_scale_factors * Ih_table.Ih_table["target_Ih_value"].to_numpy() ) ) / ( np.sqrt( Ih_table.variances + ( np.square(Ih_table.inverse_scale_factors) * Ih_table.Ih_table["target_Ih_sigmasq"].to_numpy() ) ) ) outliers_sel = np.abs(norm_dev) > self._zmax outliers_isel = np.nonzero(nz_sel)[0][outliers_sel] outliers = np.full(self._Ih_table_block.size, False) outliers[outliers_isel] = True self._outlier_indices = np.concatenate( [ self._outlier_indices, self._Ih_table_block.Ih_table["loc_indices"].iloc[outliers], ] ) self._datasets = np.concatenate( [ self._datasets, self._Ih_table_block.Ih_table["dataset_id"].iloc[outliers], ] )
[docs]class SimpleNormDevOutlierRejection(OutlierRejectionBase): """Algorithm using normalised deviations from the weighted intensity means. In this case, the weighted mean is calculated from all reflections in the symmetry group excluding the test reflection. """
[docs] def __init__(self, Ih_table, zmax): super().__init__(Ih_table, zmax) self.weights = flumpy.to_numpy( limit_outlier_weights( copy.deepcopy(self._Ih_table_block.weights), self._Ih_table_block.h_index_matrix, ) )
def _do_outlier_rejection(self): """Add indices (w.r.t. the Ih_table data) to self._outlier_indices.""" Ih_table = self._Ih_table_block intensity = Ih_table.intensities g = Ih_table.inverse_scale_factors w = self.weights wgIsum = Ih_table.sum_in_groups(w * g * intensity, output="per_refl") wg2sum = Ih_table.sum_in_groups(w * g * g, output="per_refl") # guard against zero division errors - can happen due to rounding errors # or bad data giving g values are very small zero_sel = wg2sum == 0.0 # set as one for now, then mark as outlier below. This will only affect if # g is near zero, if w is zero then throw an assertionerror. wg2sum[zero_sel] = 1.0 assert np.all(w > 0) # guard against division by zero norm_dev = (intensity - (g * wgIsum / wg2sum)) / ( np.sqrt((1.0 / w) + np.square(g / wg2sum)) ) norm_dev[zero_sel] = 1000 # to trigger rejection outliers = np.abs(norm_dev) > self._zmax self._outlier_indices = np.concatenate( [ self._outlier_indices, self._Ih_table_block.Ih_table["loc_indices"].iloc[outliers].to_numpy(), ] ) self._datasets = np.concatenate( [ self._datasets, self._Ih_table_block.Ih_table["dataset_id"].iloc[outliers].to_numpy(), ] )
[docs]class NormDevOutlierRejection(OutlierRejectionBase): """Algorithm using normalised deviations from the weighted intensity means. In this case, the weighted mean is calculated from all reflections in the symmetry group excluding the test reflection. """
[docs] def __init__(self, Ih_table, zmax): super().__init__(Ih_table, zmax) self.weights = flumpy.to_numpy( limit_outlier_weights( copy.deepcopy(self._Ih_table_block.weights), self._Ih_table_block.h_index_matrix, ) )
def _do_outlier_rejection(self): """Add indices (w.r.t. the Ih_table data) to self._outlier_indices.""" self._round_of_outlier_rejection() n_outliers = self._outlier_indices.size n_new_outliers = n_outliers while n_new_outliers: self._round_of_outlier_rejection() n_new_outliers = self._outlier_indices.size - n_outliers n_outliers = self._outlier_indices.size def _round_of_outlier_rejection(self): """ Calculate normal deviations from the data in the Ih_table. """ Ih_table = self._Ih_table_block intensity = Ih_table.intensities g = Ih_table.inverse_scale_factors w = self.weights wgIsum = Ih_table.sum_in_groups(w * g * intensity, output="per_refl") wg2sum = Ih_table.sum_in_groups(w * g * g, output="per_refl") wgIsum_others = wgIsum - (w * g * intensity) wg2sum_others = wg2sum - (w * g * g) # Now do the rejection analysis if n_in_group > 2 nh = Ih_table.calc_nh() sel = nh > 2 wg2sum_others_sel = wg2sum_others[sel] wgIsum_others_sel = wgIsum_others[sel] # guard against zero division errors - can happen due to rounding errors # or bad data giving g values are very small zero_sel = wg2sum_others_sel == 0.0 # set as one for now, then mark as outlier below. This will only affect if # g is near zero, if w is zero then throw an assertionerror. wg2sum_others_sel[zero_sel] = 1.0 g_sel = g[sel] I_sel = intensity[sel] w_sel = w[sel] assert np.all(w_sel > 0) # guard against division by zero norm_dev = (I_sel - (g_sel * wgIsum_others_sel / wg2sum_others_sel)) / ( np.sqrt((1.0 / w_sel) + (np.square(g_sel) / wg2sum_others_sel)) ) norm_dev[zero_sel] = 1000 # to trigger rejection z_score = np.abs(norm_dev) # Want an array same size as Ih table. all_z_scores = np.zeros(Ih_table.size) # all_z_scores.set_selected(sel.iselection(), z_score) all_z_scores[sel] = z_score outlier_indices, other_potential_outliers = determine_outlier_indices( Ih_table.h_index_matrix, flumpy.from_numpy(all_z_scores), self._zmax ) sel = np.full(Ih_table.size, False, dtype=bool) outlier_indices = flumpy.to_numpy(outlier_indices) sel[outlier_indices] = True lsel = self._Ih_table_block.Ih_table["loc_indices"].iloc[sel].to_numpy() dsel = self._Ih_table_block.Ih_table["dataset_id"].iloc[sel].to_numpy() self._outlier_indices = np.concatenate( [ self._outlier_indices, lsel, ] ) self._datasets = np.concatenate( [ self._datasets, dsel, ] ) sel = np.full(Ih_table.size, False, dtype=bool) sel[flumpy.to_numpy(other_potential_outliers)] = True self._Ih_table_block = self._Ih_table_block.select(sel) self.weights = self.weights[sel]