"""
Definitions of scaling models.
A scaling model is a collection of scaling model components with appropriate
methods to define how these are composed into one model.
"""
import logging
from libtbx import Auto, phil
from dials.algorithms.scaling.error_model.error_model import BasicErrorModel
from dials.algorithms.scaling.model.components.scale_components import (
LinearDoseDecay,
QuadraticDoseDecay,
SHScaleComponent,
SingleBScaleFactor,
SingleScaleFactor,
)
from dials.algorithms.scaling.model.components.smooth_scale_components import (
SmoothBScaleComponent1D,
SmoothScaleComponent1D,
SmoothScaleComponent2D,
SmoothScaleComponent3D,
)
from dials.algorithms.scaling.plots import (
plot_absorption_parameters,
plot_absorption_plots,
plot_array_absorption_plot,
plot_array_decay_plot,
plot_array_modulation_plot,
plot_dose_decay,
plot_relative_Bs,
plot_smooth_scales,
)
from dials.algorithms.scaling.scaling_utilities import sph_harm_table
from dials.array_family import flex
from dials_scaling_ext import (
calc_lookup_index,
calc_theta_phi,
create_sph_harm_lookup_table,
)
logger = logging.getLogger("dials")
import pkg_resources
base_model_phil_str = """\
correction.fix = None
.type = strings
.help = "If specified, this correction will not be refined in this scaling run"
"""
kb_model_phil_str = (
"""\
decay_correction = True
.type = bool
.help = "Option to turn off decay correction (for physical/array/KB
default models)."
.expert_level = 1
"""
+ base_model_phil_str
)
dose_decay_model_phil_str = (
"""\
scale_interval = 2.0
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the scale"
"component."
.expert_level = 1
relative_B_correction = True
.type = bool
.help = "Option to turn off relative B correction."
.expert_level = 1
decay_correction = True
.type = bool
.help = "Option to turn off decay correction."
.expert_level = 1
share.decay = True
.type = bool
.help = "Share the decay model between sweeps."
.expert_level = 1
resolution_dependence = *quadratic linear
.type = choice
.help = "Use a dose model that depends linearly or quadratically on 1/d"
.expert_level = 1
absorption_correction = False
.type = bool
.help = "Option to turn on spherical harmonic absorption correction."
.expert_level = 1
lmax = 4
.type = int(value_min=2)
.help = "Number of spherical harmonics to include for absorption"
"correction, recommended to be no more than 6."
.expert_level = 2
surface_weight = 1e6
.type = float(value_min=0.0)
.help = "Restraint weight applied to spherical harmonic terms in the"
"absorption correction."
.expert_level = 2
fix_initial = True
.type = bool
.help = "If performing full matrix minimisation, in the final cycle,"
"constrain the initial parameter for more reliable parameter and"
"scale factor error estimates."
.expert_level = 2
"""
+ base_model_phil_str
)
physical_model_phil_str = (
"""\
scale_interval = auto
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the scale"
"component (auto scales interval depending on oscillation range)."
.expert_level = 1
decay_correction = True
.type = bool
.help = "Option to turn off decay correction."
.expert_level = 1
decay_interval = auto
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the decay"
"component (auto scales interval depending on oscillation range)."
.expert_level = 1
decay_restraint = 1e-1
.type = float(value_min=0.0)
.help = "Weight to weakly restrain B-values to 0."
.expert_level = 2
absorption_correction = auto
.type = bool
.help = "Option to turn off absorption correction (default True if oscillation > 60.0)."
.expert_level = 1
absorption_level = low medium high
.type = choice
.help = "Expected degree of relative absorption for different scattering"
"paths through the crystal(s). If an option is selected, the"
"scaling model parameters lmax and surface_weight will be set to"
"appropriate values."
"Relative absorption increases as crystal size increases,"
"increases as wavelength increases and is increased as the crystal"
"dimensions become less equal (i.e. is higher for needle shaped"
"crystals and zero for a spherical crystal)."
"Definitions of the levels and approximate correction magnitude:"
"low: ~1%% relative absorption, expected for typical protein"
" crystals (containing no strongly absorbing atoms) on the"
" order of ~100um measured at ~1A wavelength."
"medium: ~5%% relative absorption"
"high: >25%% relative absorption, e.g. for measurements at long"
" wavelength or crystals with high absorption from heavy atoms."
.expert_level = 1
lmax = auto
.type = int(value_min=2)
.help = "Number of spherical harmonics to include for absorption"
"correction, defaults to 4 if no absorption_level is chosen."
"It is recommended that the value need be no more than 6."
.expert_level = 1
surface_weight = auto
.type = float(value_min=0.0)
.help = "Restraint weight applied to spherical harmonic terms in the"
"absorption correction. A lower restraint allows a higher amount"
"of absorption correction. Defaults to 5e5 if no absorption_level"
"is chosen."
.expert_level = 1
share.absorption = False
.type = bool
.help = "If True, a common absorption correction is refined across all sweeps".
fix_initial = True
.type = bool
.help = "If performing full matrix minimisation, in the final cycle,"
"constrain the initial parameter for more reliable parameter and"
"scale factor error estimates."
.expert_level = 2
"""
+ base_model_phil_str
)
array_model_phil_str = (
"""\
decay_correction = True
.type = bool
.help = "Option to turn off decay correction (a 2D grid of parameters as"
"a function of rotation and resolution (d-value))."
.expert_level = 1
decay_interval = 20.0
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the decay"
"and absorption corrections."
.expert_level = 1
n_resolution_bins = 10
.type = int(value_min=1)
.help = "Number of resolution bins to use for the decay term."
.expert_level = 1
absorption_correction = True
.type = bool
.help = "Option to turn off absorption correction (a 3D grid of"
"parameters as a function of rotation angle, detector-x and"
"detector-y position)."
.expert_level = 1
n_absorption_bins = 3
.type = int(value_min=1)
.help = "Number of bins in each dimension (applied to both x and y) for"
"binning the detector position for the absorption term of the"
"array model."
.expert_level = 1
modulation_correction = False
.type = bool
.help = "Option to turn on a detector correction for the array default"
"model."
.expert_level = 2
n_modulation_bins = 20
.type = int(value_min=1)
.help = "Number of bins in each dimension (applied to both x and y) for"
"binning the detector position for the modulation correction."
.expert_level = 2
"""
+ base_model_phil_str
)
autos = [Auto, "auto", "Auto"]
[docs]class ScalingModelBase:
"""Abstract base class for scaling models."""
id_ = None
[docs] def __init__(self, configdict, is_scaled=False):
"""Initialise the model with no components and a :obj:`configdict`."""
if not configdict["corrections"]:
raise ValueError("No model components created.")
self._components = {}
self._configdict = configdict
self._is_scaled = is_scaled
self._error_model = None
self._fixed_components = []
@property
def is_scaled(self):
""":obj:`bool`: Indicate whether this model has previously been refined."""
return self._is_scaled
[docs] def fix_initial_parameter(self, params):
"""Fix a parameter of the scaling model."""
return False
@property
def fixed_components(self):
return self._fixed_components
@fixed_components.setter
def fixed_components(self, components):
self._fixed_components = components
[docs] def limit_image_range(self, new_image_range):
"""Modify the model if necessary due to reducing the image range.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
pass
[docs] def set_scaling_model_as_scaled(self):
"""Set the boolean 'is_scaled' flag as True."""
self._is_scaled = True
[docs] def set_scaling_model_as_unscaled(self):
"""Set the boolean 'is_scaled' flag as False."""
self._is_scaled = False
[docs] def plot_model_components(self, reflection_table=None):
"""Return a dict of plots for plotting model components with plotly."""
return {}
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the model from input data."""
raise NotImplementedError()
[docs] def set_valid_image_range(self, image_range):
"""Set the valid image range for the model in the :obj:`configdict`."""
self._configdict["valid_image_range"] = image_range
@property
def error_model(self):
""":obj:`error_model`: The error model associated with the scaling model."""
return self._error_model
@property
def configdict(self):
""":obj:`dict`: a dictionary of the model configuration parameters."""
return self._configdict
@property
def components(self):
""":obj:`dict`: a dictionary of the model components."""
return self._components
@property
def n_params(self):
""":obj:`dict`: a dictionary of the model components."""
return sum(c.parameters.size() for c in self._components.values())
[docs] def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names.
This list indicates to the scaler the order to perform scaling in
consecutive scaling mode.
e.g. [['scale', 'decay'], ['absorption']] would cause the first cycle to
refine scale and decay, and then absorption in a subsequent cycle.
"""
raise NotImplementedError()
[docs] def to_dict(self):
"""Serialize the model to a dictionary.
Returns:
dict: A dictionary representation of the model.
"""
dictionary = {"__id__": self.id_}
for key in self.components:
dictionary[key] = {
"n_parameters": self._components[key].n_params,
"parameters": list(self._components[key].parameters),
"null_parameter_value": self._components[key].null_parameter_value,
}
if self._components[key].parameter_esds:
dictionary[key]["est_standard_devs"] = list(
self._components[key].parameter_esds
)
dictionary["configuration_parameters"] = self._configdict
return dictionary
[docs] @classmethod
def from_dict(cls, obj):
"""Create a scaling model from a dictionary."""
raise NotImplementedError()
[docs] def update(self, model_params):
pass
[docs] def load_error_model(self, error_params):
# load existing model if there, but use user-specified values if given
new_model = None
if (
"error_model_type" in self._configdict
and not error_params.reset_error_model
):
if self._configdict["error_model_type"] == "BasicErrorModel":
p = self._configdict["error_model_parameters"]
a = None
b = None
if not error_params.basic.a:
a = p[0]
if not error_params.basic.b:
b = p[1]
new_model = BasicErrorModel(a, b, error_params.basic)
if not new_model:
new_model = BasicErrorModel(basic_params=error_params.basic)
logger.info(f"Loaded error model: {new_model}")
self.set_error_model(new_model)
[docs] def set_error_model(self, error_model):
"""Associate an error model with the dataset."""
self._error_model = error_model
self._configdict.update(
{
"error_model_type": self.error_model.__class__.__name__,
"error_model_parameters": list(error_model.parameters),
}
)
[docs] def record_intensity_combination_Imid(self, Imid):
"""Record the intensity combination Imid value."""
self._configdict["Imid"] = Imid
[docs] def get_shared_components(self):
return None
def __str__(self):
""":obj:`str`: Return a string representation of a scaling model."""
msg = ["Scaling model:"]
msg.append(" type : " + str(self.id_))
for name, component in self.components.items():
msg.append(" " + str(name).capitalize() + " component:")
if component.parameter_esds:
msg.append(" parameters (sigma)")
for p, e in zip(component.parameters, component.parameter_esds):
if p < 0.0:
msg.append(f" {p:.4f} ({e:.4f})")
else:
msg.append(f" {p:.4f} ({e:.4f})")
else:
msg.append(" parameters")
for p in component.parameters:
if p < 0.0:
msg.append(f" {p:.4f}")
else:
msg.append(f" {p:.4f}")
msg.append("")
return "\n".join(msg)
def _add_absorption_component_to_physically_derived_model(model, reflection_table):
lmax = model.configdict["lmax"]
if reflection_table.size() > 100000:
assert "s0c" in reflection_table
assert "s1c" in reflection_table
theta_phi_0 = calc_theta_phi(
reflection_table["s0c"]
) # array of tuples in radians
theta_phi_1 = calc_theta_phi(reflection_table["s1c"])
s0_lookup_index = calc_lookup_index(theta_phi_0, points_per_degree=2)
s1_lookup_index = calc_lookup_index(theta_phi_1, points_per_degree=2)
if SHScaleComponent.coefficients_list is None:
SHScaleComponent.coefficients_list = create_sph_harm_lookup_table(
lmax, points_per_degree=2
) # set the class variable and share
elif len(SHScaleComponent.coefficients_list) < (lmax * (2.0 + lmax)):
# this (rare) case can happen if adding a new dataset with a larger lmax!
SHScaleComponent.coefficients_list = create_sph_harm_lookup_table(
lmax, points_per_degree=2
) # set the class variable and share
model.components["absorption"].data = {
"s0_lookup": s0_lookup_index,
"s1_lookup": s1_lookup_index,
}
# here just pass in good reflections
else:
model.components["absorption"].data = {
"sph_harm_table": sph_harm_table(reflection_table, lmax)
}
surface_weight = model.configdict["abs_surface_weight"]
parameter_restraints = flex.double([])
for i in range(1, lmax + 1):
parameter_restraints.extend(flex.double([1.0] * ((2 * i) + 1)))
parameter_restraints *= surface_weight
model.components["absorption"].parameter_restraints = parameter_restraints
[docs]class DoseDecay(ScalingModelBase):
"""A model similar to the physical model, where an exponential decay
component is used plus a relative B-factor per sweep, with no absorption
surface by default. Most suitable for multi-crystal datasets."""
id_ = "dose_decay"
phil_scope = phil.parse(dose_decay_model_phil_str)
[docs] def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the physical scaling model components."""
super().__init__(configdict, is_scaled)
if "scale" in configdict["corrections"]:
scale_setup = parameters_dict["scale"]
self._components["scale"] = SmoothScaleComponent1D(
scale_setup["parameters"], scale_setup["parameter_esds"]
)
if "decay" in configdict["corrections"]:
decay_setup = parameters_dict["decay"]
if configdict["resolution_dependence"] == "linear":
self._components["decay"] = LinearDoseDecay(
decay_setup["parameters"], decay_setup["parameter_esds"]
)
else:
self._components["decay"] = QuadraticDoseDecay(
decay_setup["parameters"], decay_setup["parameter_esds"]
)
if "relative_B" in configdict["corrections"]:
B_setup = parameters_dict["relative_B"]
self._components["relative_B"] = SingleBScaleFactor(
B_setup["parameters"], B_setup["parameter_esds"]
)
if "absorption" in configdict["corrections"]:
absorption_setup = parameters_dict["absorption"]
self._components["absorption"] = SHScaleComponent(
absorption_setup["parameters"], absorption_setup["parameter_esds"]
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["scale", "relative_B", "decay"], ["absorption"]]
[docs] def fix_initial_parameter(self, params):
if "scale" in self.components and params.dose_decay.fix_initial:
self.components["scale"].fix_initial_parameter()
return True
[docs] def get_shared_components(self):
if "shared" in self.configdict:
if (
"decay" in self.configdict["shared"]
and "decay" not in self.fixed_components
):
return "decay"
return None
[docs] def update(self, params):
if params.dose_decay.correction.fix:
self.fixed_components = params.dose_decay.correction.fix
[docs] def limit_image_range(self, new_image_range):
"""Modify the model to be suitable for a reduced image range.
For this model, this involves determining whether the number of parameters
should be reduced and may reduce the number of parameters in the scale and
decay components.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
conf = self.configdict
current_image_range = conf["valid_image_range"]
current_osc_range = conf["valid_osc_range"]
# calculate one osc as don't have access to scan object here
one_osc = (conf["valid_osc_range"][1] - conf["valid_osc_range"][0]) / (
conf["valid_image_range"][1] - (conf["valid_image_range"][0] - 1)
)
new_osc_range = (
(new_image_range[0] - current_image_range[0]) * one_osc,
(new_image_range[1] - current_image_range[0] + 1) * one_osc,
)
if "scale" in self.components:
n_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["scale_rot_interval"]
)
n_old_params = len(self.components["scale"].parameters)
conf["scale_rot_interval"] = scale_rot_int
conf["s_norm_fac"] = s_norm_fac
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
s_norm_fac,
n_old_params,
n_param,
)
new_params = self.components["scale"].parameters[offset : offset + n_param]
self.components["scale"].set_new_parameters(new_params)
new_osc_range_0 = current_osc_range[0] + (
(new_image_range[0] - current_image_range[0]) * one_osc
)
new_osc_range_1 = current_osc_range[1] + (
(new_image_range[1] - current_image_range[1]) * one_osc
)
self._configdict["valid_osc_range"] = (new_osc_range_0, new_osc_range_1)
self.set_valid_image_range(new_image_range)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the scaling model defined by the params."""
params = params.dose_decay
configdict = {"corrections": []}
parameters_dict = {}
osc_range = experiment.scan.get_oscillation_range()
one_osc_width = experiment.scan.get_oscillation()[1]
configdict.update({"valid_osc_range": osc_range})
if params.share.decay:
configdict.update({"shared": ["decay"]})
configdict["corrections"].append("scale")
n_scale_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
osc_range, one_osc_width, params.scale_interval
)
configdict.update(
{"s_norm_fac": s_norm_fac, "scale_rot_interval": scale_rot_int}
)
parameters_dict["scale"] = {
"parameters": flex.double(n_scale_param, 1.0),
"parameter_esds": None,
}
if params.relative_B_correction:
configdict["corrections"].append("relative_B")
parameters_dict["relative_B"] = {
"parameters": flex.double(1, 0.0),
"parameter_esds": None,
}
if params.decay_correction:
configdict["corrections"].append("decay")
configdict.update({"resolution_dependence": params.resolution_dependence})
parameters_dict["decay"] = {
"parameters": flex.double(1, 0.0),
"parameter_esds": None,
}
if params.absorption_correction:
configdict["corrections"].append("absorption")
n_abs_param = params.lmax * (
2 + params.lmax
) # arithmetic sum formula (a1=3, d=2)
configdict.update({"lmax": params.lmax})
configdict.update({"abs_surface_weight": params.surface_weight})
parameters_dict["absorption"] = {
"parameters": flex.double(n_abs_param, 0.0),
"parameter_esds": None,
}
model = cls(parameters_dict, configdict)
if params.correction.fix:
model.fixed_components = params.correction.fix
return model
[docs] @classmethod
def from_dict(cls, obj):
"""Create a :obj:`PhysicalScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError(f"expected __id__ {cls.id_}, got {obj['__id__']}")
(s_params, d_params, abs_params, B) = (None, None, None, None)
(s_params_sds, d_params_sds, a_params_sds, B_sd) = (None, None, None, None)
configdict = obj["configuration_parameters"]
if "scale" in configdict["corrections"]:
s_params = flex.double(obj["scale"]["parameters"])
if "est_standard_devs" in obj["scale"]:
s_params_sds = flex.double(obj["scale"]["est_standard_devs"])
if "relative_B" in configdict["corrections"]:
B = flex.double(obj["relative_B"]["parameters"])
if "est_standard_devs" in obj["relative_B"]:
B_sd = flex.double(obj["relative_B"]["est_standard_devs"])
if "decay" in configdict["corrections"]:
d_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
if "absorption" in configdict["corrections"]:
abs_params = flex.double(obj["absorption"]["parameters"])
if "est_standard_devs" in obj["absorption"]:
a_params_sds = flex.double(obj["absorption"]["est_standard_devs"])
parameters_dict = {
"scale": {"parameters": s_params, "parameter_esds": s_params_sds},
"decay": {"parameters": d_params, "parameter_esds": d_params_sds},
"relative_B": {"parameters": B, "parameter_esds": B_sd},
"absorption": {"parameters": abs_params, "parameter_esds": a_params_sds},
}
return cls(parameters_dict, configdict, is_scaled=True)
[docs] def plot_model_components(self, reflection_table=None):
d = plot_dose_decay(self)
if "absorption" in self.components:
d.update(plot_absorption_parameters(self))
d.update(plot_absorption_plots(self, reflection_table))
return d
[docs]def determine_auto_absorption_params(absorption):
if absorption == "high":
lmax = 6
surface_weight = 5e3
elif absorption == "medium":
lmax = 6
surface_weight = 5e4
else: # low
lmax = 4
surface_weight = 5e5
return lmax, surface_weight
[docs]class PhysicalScalingModel(ScalingModelBase):
"""A scaling model for a physical parameterisation."""
id_ = "physical"
phil_scope = phil.parse(physical_model_phil_str)
[docs] def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the physical scaling model components."""
super().__init__(configdict, is_scaled)
if "scale" in configdict["corrections"]:
scale_setup = parameters_dict["scale"]
self._components["scale"] = SmoothScaleComponent1D(
scale_setup["parameters"], scale_setup["parameter_esds"]
)
if "decay" in configdict["corrections"]:
decay_setup = parameters_dict["decay"]
self._components["decay"] = SmoothBScaleComponent1D(
decay_setup["parameters"], decay_setup["parameter_esds"]
)
if "absorption" in configdict["corrections"]:
absorption_setup = parameters_dict["absorption"]
self._components["absorption"] = SHScaleComponent(
absorption_setup["parameters"], absorption_setup["parameter_esds"]
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["scale", "decay"], ["absorption"]]
[docs] def fix_initial_parameter(self, params):
if "scale" in self.components and params.physical.fix_initial:
self.components["scale"].fix_initial_parameter()
return True
[docs] def limit_image_range(self, new_image_range):
"""Modify the model to be suitable for a reduced image range.
For this model, this involves determining whether the number of parameters
should be reduced and may reduce the number of parameters in the scale and
decay components.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
conf = self.configdict
current_image_range = conf["valid_image_range"]
current_osc_range = conf["valid_osc_range"]
# calculate one osc as don't have access to scan object here
one_osc = (conf["valid_osc_range"][1] - conf["valid_osc_range"][0]) / (
conf["valid_image_range"][1] - (conf["valid_image_range"][0] - 1)
)
new_osc_range = (
(new_image_range[0] - current_image_range[0]) * one_osc,
(new_image_range[1] - current_image_range[0] + 1) * one_osc,
)
if "scale" in self.components:
n_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["scale_rot_interval"]
)
n_old_params = len(self.components["scale"].parameters)
conf["scale_rot_interval"] = scale_rot_int
conf["s_norm_fac"] = s_norm_fac
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
s_norm_fac,
n_old_params,
n_param,
)
new_params = self.components["scale"].parameters[offset : offset + n_param]
self.components["scale"].set_new_parameters(new_params)
if "decay" in self.components:
n_param, d_norm_fac, decay_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["decay_rot_interval"]
)
n_old_params = len(self.components["decay"].parameters)
conf["decay_rot_interval"] = decay_rot_int
conf["d_norm_fac"] = d_norm_fac
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
d_norm_fac,
n_old_params,
n_param,
)
new_params = self.components["decay"].parameters[offset : offset + n_param]
self.components["decay"].set_new_parameters(new_params)
new_osc_range_0 = current_osc_range[0] + (
(new_image_range[0] - current_image_range[0]) * one_osc
)
new_osc_range_1 = current_osc_range[1] + (
(new_image_range[1] - current_image_range[1]) * one_osc
)
self._configdict["valid_osc_range"] = (new_osc_range_0, new_osc_range_1)
self.set_valid_image_range(new_image_range)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the scaling model defined by the params."""
params = params.physical
configdict = {"corrections": []}
parameters_dict = {}
osc_range = experiment.scan.get_oscillation_range()
one_osc_width = experiment.scan.get_oscillation()[1]
configdict.update({"valid_osc_range": osc_range})
abs_osc_range = abs(osc_range[1] - osc_range[0])
if params.scale_interval in autos or params.decay_interval in autos:
if abs_osc_range < 5.0:
scale_interval, decay_interval = (1.0, 1.5)
elif abs_osc_range < 10.0:
scale_interval, decay_interval = (2.0, 3.0)
elif abs_osc_range < 25.0:
scale_interval, decay_interval = (4.0, 5.0)
elif abs_osc_range < 90.0:
scale_interval, decay_interval = (8.0, 10.0)
else:
scale_interval, decay_interval = (15.0, 20.0)
if params.scale_interval not in autos:
scale_interval = params.scale_interval
if params.decay_interval not in autos:
decay_interval = params.decay_interval
configdict["corrections"].append("scale")
n_scale_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
osc_range, one_osc_width, scale_interval
)
configdict.update(
{"s_norm_fac": s_norm_fac, "scale_rot_interval": scale_rot_int}
)
parameters_dict["scale"] = {
"parameters": flex.double(n_scale_param, 1.0),
"parameter_esds": None,
}
if params.decay_correction:
configdict["corrections"].append("decay")
n_decay_param, d_norm_fac, decay_rot_int = initialise_smooth_input(
osc_range, one_osc_width, decay_interval
)
configdict.update(
{"d_norm_fac": d_norm_fac, "decay_rot_interval": decay_rot_int}
)
parameters_dict["decay"] = {
"parameters": flex.double(n_decay_param, 0.0),
"parameter_esds": None,
}
if params.absorption_correction in autos:
if abs_osc_range > 60.0:
absorption_correction = True
else:
absorption_correction = False
else:
absorption_correction = params.absorption_correction
if absorption_correction or params.absorption_level:
configdict["corrections"].append("absorption")
if params.share.absorption:
configdict.update({"shared": ["absorption"]})
if params.absorption_level:
lmax, surface_weight = determine_auto_absorption_params(
params.absorption_level
)
if (params.lmax not in autos) or (params.surface_weight not in autos):
logger.info(
"""Using lmax, surface_weight parameters set by the absorption_level option,
rather than user specified options"""
)
else:
lmax, surface_weight = (params.lmax, params.surface_weight)
if lmax in autos:
lmax = 4
if params.surface_weight in autos:
surface_weight = 5e5
n_abs_param = (2 * lmax) + (lmax ** 2) # arithmetic sum formula (a1=3, d=2)
configdict.update({"lmax": lmax})
configdict.update({"abs_surface_weight": surface_weight})
parameters_dict["absorption"] = {
"parameters": flex.double(n_abs_param, 0.0),
"parameter_esds": None,
}
model = cls(parameters_dict, configdict)
if params.correction.fix:
model.fixed_components = params.correction.fix
return model
[docs] @classmethod
def from_dict(cls, obj):
"""Create a :obj:`PhysicalScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError(f"expected __id__ {cls.id_}, got {obj['__id__']}")
(s_params, d_params, abs_params) = (None, None, None)
(s_params_sds, d_params_sds, a_params_sds) = (None, None, None)
configdict = obj["configuration_parameters"]
if "scale" in configdict["corrections"]:
s_params = flex.double(obj["scale"]["parameters"])
if "est_standard_devs" in obj["scale"]:
s_params_sds = flex.double(obj["scale"]["est_standard_devs"])
if "decay" in configdict["corrections"]:
d_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
if "absorption" in configdict["corrections"]:
abs_params = flex.double(obj["absorption"]["parameters"])
if "est_standard_devs" in obj["absorption"]:
a_params_sds = flex.double(obj["absorption"]["est_standard_devs"])
parameters_dict = {
"scale": {"parameters": s_params, "parameter_esds": s_params_sds},
"decay": {"parameters": d_params, "parameter_esds": d_params_sds},
"absorption": {"parameters": abs_params, "parameter_esds": a_params_sds},
}
return cls(parameters_dict, configdict, is_scaled=True)
[docs] def update(self, params):
"""Update the model if new options chosen in the phil scope."""
if params.physical.correction.fix:
self.fixed_components = params.physical.correction.fix
if "absorption" in self.components:
new_lmax = None
if params.physical.absorption_level:
lmax, surface_weight = determine_auto_absorption_params(
params.physical.absorption_level
)
self._configdict.update({"abs_surface_weight": surface_weight})
if lmax != self._configdict["lmax"]:
new_lmax = lmax
else: # check manually specified parameters.
if params.physical.surface_weight not in autos:
self._configdict.update(
{"abs_surface_weight": params.physical.surface_weight}
)
if (params.physical.lmax not in autos) and (
params.physical.lmax != self._configdict["lmax"]
):
new_lmax = params.physical.lmax
if new_lmax:
# need to change the parameters for this component
current_parameters = self.components["absorption"].parameters
n_abs_param = new_lmax * (2 + new_lmax)
self._configdict.update({"lmax": new_lmax})
new_parameters = flex.double(n_abs_param, 0.0)
# copy across matching parameters:
for i in range(0, min(n_abs_param, current_parameters.size())):
new_parameters[i] = current_parameters[i]
self._components["absorption"] = SHScaleComponent(
new_parameters, flex.double(n_abs_param, 0.0)
)
if params.physical.share.absorption:
self._configdict.update({"shared": ["absorption"]})
[docs] def plot_model_components(self, reflection_table=None):
d = plot_smooth_scales(self)
if "absorption" in self.components:
d.update(plot_absorption_parameters(self))
d.update(plot_absorption_plots(self, reflection_table))
return d
[docs] def get_shared_components(self):
if "shared" in self.configdict:
if (
"absorption" in self.configdict["shared"]
and "absorption" not in self.fixed_components
):
return "absorption"
return None
[docs]class ArrayScalingModel(ScalingModelBase):
"""A scaling model for an array-based parameterisation."""
id_ = "array"
phil_scope = phil.parse(array_model_phil_str)
[docs] def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the array scaling model components."""
super().__init__(configdict, is_scaled)
if not any(i in configdict["corrections"] for i in ["decay", "absorption"]):
raise ValueError(
"Array model must have at least one of decay or absorption corrections"
)
if "decay" in configdict["corrections"]:
decay_setup = parameters_dict["decay"]
self._components["decay"] = SmoothScaleComponent2D(
decay_setup["parameters"],
shape=(configdict["n_res_param"], configdict["n_time_param"]),
parameter_esds=decay_setup["parameter_esds"],
)
if "absorption" in configdict["corrections"]:
abs_setup = parameters_dict["absorption"]
self._components["absorption"] = SmoothScaleComponent3D(
abs_setup["parameters"],
shape=(
configdict["n_x_param"],
configdict["n_y_param"],
configdict["n_time_param"],
),
parameter_esds=abs_setup["parameter_esds"],
)
if "modulation" in configdict["corrections"]:
mod_setup = parameters_dict["modulation"]
self._components["modulation"] = SmoothScaleComponent2D(
mod_setup["parameters"],
shape=(configdict["n_x_mod_param"], configdict["n_y_mod_param"]),
parameter_esds=mod_setup["parameter_esds"],
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["decay"], ["absorption"], ["modulation"]]
[docs] def update(self, params):
if params.array.correction.fix:
self.fixed_components = params.array.correction.fix
[docs] def limit_image_range(self, new_image_range):
"""Modify the model to be suitable for a reduced image range.
For this model, this involves determining whether the number of parameters
should be reduced and may reduce the number of parameters in the absorption
and decay components.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
conf = self.configdict
current_image_range = conf["valid_image_range"]
current_osc_range = conf["valid_osc_range"]
# calculate one osc as don't have access to scan object here
one_osc = (conf["valid_osc_range"][1] - conf["valid_osc_range"][0]) / (
conf["valid_image_range"][1] - (conf["valid_image_range"][0] - 1)
)
new_osc_range = (
(new_image_range[0] - current_image_range[0]) * one_osc,
(new_image_range[1] - current_image_range[0] + 1) * one_osc,
)
n_param, time_norm_fac, time_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["time_rot_interval"]
)
if "decay" in self.components:
n_old_time_params = int(
len(self.components["decay"].parameters)
/ self.components["decay"].n_x_params
)
else:
n_old_time_params = int(
len(self.components["absorption"].parameters)
/ (
self.components["absorption"].n_x_params
* self.components["absorption"].n_y_params
)
)
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
time_norm_fac,
n_old_time_params,
n_param,
)
if "absorption" in self.components:
params = self.components["absorption"].parameters
n_x_params = self.components["absorption"].n_x_params
n_y_params = self.components["absorption"].n_y_params
# can't do simple slice as 3-dim array
time_offset = offset * n_x_params * n_y_params
new_params = params[
time_offset : time_offset + (n_param * n_x_params * n_y_params)
]
self.components["absorption"].set_new_parameters(
new_params, shape=(n_x_params, n_y_params, n_param)
)
if "decay" in self.components:
params = self.components["decay"].parameters
n_decay_params = self.components["decay"].n_x_params
# can't do simple slice as 2-dim array
decay_offset = offset * n_decay_params
new_params = params[
decay_offset : decay_offset + (n_param * n_decay_params)
]
self.components["decay"].set_new_parameters(
new_params, shape=(n_decay_params, n_param)
)
self._configdict["n_time_param"] = n_param
self._configdict["time_norm_fac"] = time_norm_fac
self._configdict["time_rot_interval"] = time_rot_int
new_osc_range_0 = current_osc_range[0] + (
(new_image_range[0] - current_image_range[0]) * one_osc
)
new_osc_range_1 = current_osc_range[1] + (
(new_image_range[1] - current_image_range[1]) * one_osc
)
self._configdict["valid_osc_range"] = (new_osc_range_0, new_osc_range_1)
self.set_valid_image_range(new_image_range)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""create an array-based scaling model."""
params = params.array
reflections = reflection_table.select(reflection_table["d"] > 0.0)
configdict = {"corrections": []}
# First initialise things common to more than one correction.
one_osc_width = experiment.scan.get_oscillation()[1]
osc_range = experiment.scan.get_oscillation_range()
configdict.update({"valid_osc_range": osc_range})
n_time_param, time_norm_fac, time_rot_int = initialise_smooth_input(
osc_range, one_osc_width, params.decay_interval
)
(xvalues, yvalues, _) = reflections["xyzobs.px.value"].parts()
(xmax, xmin) = (flex.max(xvalues) + 0.001, flex.min(xvalues) - 0.001)
(ymax, ymin) = (flex.max(yvalues) + 0.001, flex.min(yvalues) - 0.001)
parameters_dict = {}
if params.decay_correction:
configdict["corrections"].append("decay")
resmax = (1.0 / (flex.min(reflections["d"]) ** 2)) + 1e-6
resmin = (1.0 / (flex.max(reflections["d"]) ** 2)) - 1e-6
n_res_bins = params.n_resolution_bins
n_res_param, res_bin_width = calc_n_param_from_bins(
resmin, resmax, n_res_bins
)
configdict.update(
{
"n_res_param": n_res_param,
"n_time_param": n_time_param,
"resmin": resmin,
"res_bin_width": res_bin_width,
"time_norm_fac": time_norm_fac,
"time_rot_interval": time_rot_int,
}
)
parameters_dict["decay"] = {
"parameters": flex.double((n_time_param * n_res_param), 1.0),
"parameter_esds": None,
}
if params.absorption_correction:
configdict["corrections"].append("absorption")
nxbins = nybins = params.n_absorption_bins
n_x_param, x_bin_width = calc_n_param_from_bins(xmin, xmax, nxbins)
n_y_param, y_bin_width = calc_n_param_from_bins(ymin, ymax, nybins)
configdict.update(
{
"n_x_param": n_x_param,
"n_y_param": n_y_param,
"xmin": xmin,
"ymin": ymin,
"x_bin_width": x_bin_width,
"y_bin_width": y_bin_width,
"n_time_param": n_time_param,
"time_norm_fac": time_norm_fac,
"time_rot_interval": time_rot_int,
}
)
parameters_dict["absorption"] = {
"parameters": flex.double((n_x_param * n_y_param * n_time_param), 1.0),
"parameter_esds": None,
}
if params.modulation_correction:
configdict["corrections"].append("modulation")
nx_det_bins = ny_det_bins = params.n_modulation_bins
n_x_mod_param, x_det_bw = calc_n_param_from_bins(xmin, xmax, nx_det_bins)
n_y_mod_param, y_det_bw = calc_n_param_from_bins(ymin, ymax, ny_det_bins)
configdict.update(
{
"n_x_mod_param": n_x_mod_param,
"n_y_mod_param": n_y_mod_param,
"xmin": xmin,
"ymin": ymin,
"x_det_bin_width": x_det_bw,
"y_det_bin_width": y_det_bw,
}
)
parameters_dict["modulation"] = {
"parameters": flex.double((n_x_mod_param * n_y_mod_param), 1.0),
"parameter_esds": None,
}
model = cls(parameters_dict, configdict)
if params.correction.fix:
model.fixed_components = params.correction.fix
return model
[docs] @classmethod
def from_dict(cls, obj):
"""Create an :obj:`ArrayScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError(f"expected __id__ {cls.id_}, got {obj['__id__']}")
configdict = obj["configuration_parameters"]
(dec_params, abs_params, mod_params) = (None, None, None)
(d_params_sds, a_params_sds, m_params_sds) = (None, None, None)
if "decay" in configdict["corrections"]:
dec_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
if "absorption" in configdict["corrections"]:
abs_params = flex.double(obj["absorption"]["parameters"])
if "est_standard_devs" in obj["absorption"]:
a_params_sds = flex.double(obj["absorption"]["est_standard_devs"])
if "modulation" in configdict["corrections"]:
mod_params = flex.double(obj["modulation"]["parameters"])
if "est_standard_devs" in obj["modulation"]:
m_params_sds = flex.double(obj["modulation"]["est_standard_devs"])
parameters_dict = {
"decay": {"parameters": dec_params, "parameter_esds": d_params_sds},
"absorption": {"parameters": abs_params, "parameter_esds": a_params_sds},
"modulation": {"parameters": mod_params, "parameter_esds": m_params_sds},
}
return cls(parameters_dict, configdict, is_scaled=True)
[docs] def plot_model_components(self, reflection_table=None):
d = {}
if "absorption" in self.components:
d.update(plot_array_absorption_plot(self))
if "decay" in self.components:
d.update(plot_array_decay_plot(self))
if "modulation" in self.components:
d.update(plot_array_modulation_plot(self))
return d
[docs]class KBScalingModel(ScalingModelBase):
"""A scaling model for a KB parameterisation."""
id_ = "KB"
phil_scope = phil.parse(kb_model_phil_str)
[docs] def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the KB scaling model components."""
super().__init__(configdict, is_scaled)
if "scale" in configdict["corrections"]:
self._components["scale"] = SingleScaleFactor(
parameters_dict["scale"]["parameters"],
parameters_dict["scale"]["parameter_esds"],
)
if "decay" in configdict["corrections"]:
self._components["decay"] = SingleBScaleFactor(
parameters_dict["decay"]["parameters"],
parameters_dict["decay"]["parameter_esds"],
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["scale", "decay"]]
[docs] def update(self, params):
if params.KB.correction.fix:
self.fixed_components = params.KB.correction.fix
[docs] @classmethod
def from_dict(cls, obj):
"""Create an :obj:`KBScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError(f"expected __id__ {cls.id_}, got {obj['__id__']}")
configdict = obj["configuration_parameters"]
(s_params, d_params) = (None, None)
(s_params_sds, d_params_sds) = (None, None)
if "scale" in configdict["corrections"]:
s_params = flex.double(obj["scale"]["parameters"])
if "est_standard_devs" in obj["scale"]:
s_params_sds = flex.double(obj["scale"]["est_standard_devs"])
if "decay" in configdict["corrections"]:
d_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
parameters_dict = {
"scale": {"parameters": s_params, "parameter_esds": s_params_sds},
"decay": {"parameters": d_params, "parameter_esds": d_params_sds},
}
return cls(parameters_dict, configdict, is_scaled=True)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the :obj:`KBScalingModel` from data."""
configdict = {"corrections": []}
parameters_dict = {}
if params.KB.decay_correction:
configdict["corrections"].append("decay")
parameters_dict["decay"] = {
"parameters": flex.double([0.0]),
"parameter_esds": None,
}
configdict["corrections"].append("scale")
parameters_dict["scale"] = {
"parameters": flex.double([1.0]),
"parameter_esds": None,
}
model = cls(parameters_dict, configdict)
if params.KB.correction.fix:
model.fixed_components = params.KB.correction.fix
return model
[docs]def calculate_new_offset(
current_image_0, new_image_0, new_norm_fac, n_old_param, n_new_param
):
"""Calculate the parameter offset for the new image range.
Returns:
int: An offset to apply when selecting the new parameters from the
existing parameters.
"""
if n_old_param == 2:
return 0 # can't have less than two params
batch_difference = (new_image_0 - current_image_0) * new_norm_fac
n_to_shift = int(batch_difference // 1)
if batch_difference % 1 > 0.5:
n_to_shift += 1
return min(n_old_param - n_new_param, n_to_shift) # can't shift by more
# than difference between old and new
[docs]def calc_n_param_from_bins(value_min, value_max, n_bins):
"""Return the correct number of bins for initialising the gaussian
smoothers."""
assert n_bins > 0
assert isinstance(n_bins, int)
bin_width = (value_max - value_min) / n_bins
if n_bins == 1:
n_param = 2
elif n_bins == 2:
n_param = 3
else:
n_param = n_bins + 2
return n_param, bin_width
model_phil_scope = phil.parse("")
_dxtbx_scaling_models = {
ep.name: ep for ep in pkg_resources.iter_entry_points("dxtbx.scaling_model_ext")
}
assert (
_dxtbx_scaling_models
), "No models registered with dxtbx.scaling_model_ext entry point"
model_phil_scope.adopt_scope(
phil.parse(
"model ="
+ " ".join(_dxtbx_scaling_models)
+ "\n .type = choice"
+ "\n .help = Set scaling model to be applied to input datasets"
+ "\n .expert_level = 0"
)
)
for entry_point_name, entry_point in _dxtbx_scaling_models.items():
ext_master_scope = phil.parse("%s .expert_level=1 {}" % entry_point_name)
ext_phil_scope = ext_master_scope.get_without_substitution(entry_point_name)
assert len(ext_phil_scope) == 1
ext_phil_scope = ext_phil_scope[0]
ext_phil_scope.adopt_scope(entry_point.load().phil_scope)
model_phil_scope.adopt_scope(ext_master_scope)
[docs]def plot_scaling_models(model, reflection_table=None):
"""Return a dict of component plots for the model for plotting with plotly."""
return model.plot_model_components(reflection_table=reflection_table)
[docs]def make_combined_plots(data):
"""Make any plots that require evaluation of all models."""
if all(d.id_ == "dose_decay" for d in data.values()):
relative_Bs = [
d.to_dict()["relative_B"]["parameters"][0] for d in data.values()
]
return plot_relative_Bs(relative_Bs)
return {}