Source code for dxtbx.model.experiment_list

import collections
import copy
import errno
import itertools
import json
import logging
import operator
import os
import pickle

import pkg_resources

import dxtbx.datablock
from dxtbx.datablock import (
    BeamComparison,
    DataBlockFactory,
    DetectorComparison,
    GoniometerComparison,
)
from dxtbx.format.Format import Format
from dxtbx.format.FormatMultiImage import FormatMultiImage
from dxtbx.format.image import ImageBool, ImageDouble
from dxtbx.imageset import ImageGrid, ImageSequence, ImageSet, ImageSetFactory
from dxtbx.model import (
    BeamFactory,
    CrystalFactory,
    DetectorFactory,
    Experiment,
    ExperimentList,
    GoniometerFactory,
    ProfileModelFactory,
    ScanFactory,
)
from dxtbx.sequence_filenames import (
    locate_files_matching_template_string,
    template_image_range,
    template_regex,
    template_string_number_index,
)
from dxtbx.serialize import xds
from dxtbx.serialize.filename import resolve_path
from dxtbx.util import get_url_scheme

try:
    from typing import (
        Any,
        Callable,
        Dict,
        Generator,
        Iterable,
        List,
        Optional,
        Tuple,
        Type,
    )
except ImportError:
    pass

__all__ = [
    "BeamComparison",
    "DetectorComparison",
    "ExperimentListFactory",
    "GoniometerComparison",
]


logger = logging.getLogger(__name__)


class InvalidExperimentListError(RuntimeError):
    """
    Indicates an error whilst validating the experiment list.

    This means that there is some structural problem that prevents the given data
    from representing a well-formed experiment list. This doesn't indicate e.g.
    some problem with the data or model consistency.
    """


class ExperimentListDict:
    """A helper class for serializing the experiment list to dictionary (needed
    to save the experiment list to JSON format."""

    def __init__(self, obj, check_format=True, directory=None):
        """Initialise. Copy the dictionary."""
        # Basic check: This is a dict-like object. This can happen if e.g. we
        # were passed a DataBlock list instead of an ExperimentList dictionary
        if isinstance(obj, list) or not hasattr(obj, "get"):
            raise InvalidExperimentListError(
                "Expected dictionary, not {}".format(type(obj))
            )

        self._obj = copy.deepcopy(obj)
        self._check_format = check_format
        self._directory = directory

        # If this doesn't claim to be an ExperimentList, don't even try
        if self._obj.get("__id__") != "ExperimentList":
            raise InvalidExperimentListError(
                "Expected __id__ 'ExperimentList', but found {}".format(
                    repr(self._obj.get("__id__"))
                )
            )

        # Extract lists of models referenced by experiments
        # Go through all the imagesets and make sure the dictionary
        # references by an index rather than a file path.
        self._lookups = {
            model: self._extract_models(model, function)
            for model, function in (
                ("beam", BeamFactory.from_dict),
                ("detector", DetectorFactory.from_dict),
                ("goniometer", GoniometerFactory.from_dict),
                ("scan", ScanFactory.from_dict),
                ("crystal", CrystalFactory.from_dict),
                ("profile", ProfileModelFactory.from_dict),
                ("imageset", lambda x: x),
                ("scaling_model", self._scaling_model_from_dict),
            )
        }

    def _extract_models(self, name, from_dict):
        """
        Helper function. Extract the models.

        if name == imageset: Extract imageset objects from the source.

        This function does resolving of an (old) method of imageset lookup
        e.g. it was valid to have a string as the imageset value in an
        experiment instead of an int - in which case the imageset was
        loaded from the named file in the target directory.

        If any experiments point to a file in this way, the imageset is
        loaded and the experiment is rewritted with an integer pointing
        to the new ImageSet in the returned list.

        Returns:
                The ordered list of serialized-ImageSet dictionaries
                that the Experiment list points to.
        """

        # Extract all the model list
        mlist = self._obj.get(name, [])

        # Convert the model from dictionary to concreate
        # python class for the model.
        mlist = [from_dict(d) for d in mlist]

        # Dictionaries for file mappings
        mmap = {}

        # For each experiment, check the model is not specified by
        # a path, if it is then get the dictionary of the model
        # and insert it into the list. Replace the path reference
        # with an index
        for eobj in self._obj["experiment"]:
            value = eobj.get(name)
            if value is None:
                continue
            elif isinstance(value, str):
                if value not in mmap:
                    mmap[value] = len(mlist)
                    mlist.append(
                        from_dict(_experimentlist_from_file(value, self._directory))
                    )
                eobj[name] = mmap[value]
            elif not isinstance(value, int):
                raise TypeError("expected int or str, got %s" % type(value))

        return mlist

    def _load_pickle_path(self, imageset_data, param):
        # type: (Dict, str) -> Tuple[Optional[str], Any]
        """
        Read a filename from an imageset dict and load if required.

        Args:
            imageset_data: The dictionary holding imageset information
            param: The key name to lookup in the imageset dictionary

        Returns:
            A tuple of (filename, data) where data has been loaded from
            the pickle file. If there is no key entry then (None, None)
            is returned. If the configuration parameter check_format is
            False then (filename, None) will be returned.
        """
        if param not in imageset_data:
            return "", None

        filename = resolve_path(imageset_data[param], directory=self._directory)
        if self._check_format and filename:
            with open(filename, "rb") as fh:
                return filename, pickle.load(fh, encoding="bytes")

        return filename or "", None

    def _imageset_from_imageset_data(self, imageset_data, models):
        """Make an imageset from imageset_data - help with refactor decode."""
        assert imageset_data is not None
        if "params" in imageset_data:
            format_kwargs = imageset_data["params"]
        else:
            format_kwargs = {}

        beam = models["beam"]
        detector = models["detector"]
        goniometer = models["goniometer"]
        scan = models["scan"]

        # Load the external lookup data
        mask_filename, mask = self._load_pickle_path(imageset_data, "mask")
        gain_filename, gain = self._load_pickle_path(imageset_data, "gain")
        pedestal_filename, pedestal = self._load_pickle_path(imageset_data, "pedestal")
        dx_filename, dx = self._load_pickle_path(imageset_data, "dx")
        dy_filename, dy = self._load_pickle_path(imageset_data, "dy")

        if imageset_data["__id__"] == "ImageSet":
            imageset = self._make_stills(imageset_data, format_kwargs=format_kwargs)
        elif imageset_data["__id__"] == "ImageGrid":
            imageset = self._make_grid(imageset_data, format_kwargs=format_kwargs)
        elif (
            imageset_data["__id__"] == "ImageSequence"
            or imageset_data["__id__"] == "ImageSweep"
        ):
            imageset = self._make_sequence(
                imageset_data,
                beam=beam,
                detector=detector,
                goniometer=goniometer,
                scan=scan,
                format_kwargs=format_kwargs,
            )
        elif imageset_data["__id__"] == "MemImageSet":
            imageset = self._make_mem_imageset(imageset_data)
        else:
            raise RuntimeError("Unknown imageset type")

        if imageset is not None:
            # Set the external lookup
            if mask is None:
                mask = ImageBool()
            else:
                mask = ImageBool(mask)
            if gain is None:
                gain = ImageDouble()
            else:
                gain = ImageDouble(gain)
            if pedestal is None:
                pedestal = ImageDouble()
            else:
                pedestal = ImageDouble(pedestal)
            if dx is None:
                dx = ImageDouble()
            else:
                dx = ImageDouble(dx)
            if dy is None:
                dy = ImageDouble()
            else:
                dy = ImageDouble(dy)

            if not imageset.external_lookup.mask.data.empty():
                if not mask.empty():
                    mask = tuple(m.data() for m in mask)
                    for m1, m2 in zip(mask, imageset.external_lookup.mask.data):
                        m1 &= m2.data()
                    imageset.external_lookup.mask.data = ImageBool(mask)
            else:
                imageset.external_lookup.mask.data = mask
            imageset.external_lookup.mask.filename = mask_filename
            imageset.external_lookup.gain.data = gain
            imageset.external_lookup.gain.filename = gain_filename
            imageset.external_lookup.pedestal.data = pedestal
            imageset.external_lookup.pedestal.filename = pedestal_filename
            imageset.external_lookup.dx.data = dx
            imageset.external_lookup.dx.filename = dx_filename
            imageset.external_lookup.dy.data = dy
            imageset.external_lookup.dy.filename = dy_filename

            # Update the imageset models
            if isinstance(imageset, ImageSequence):
                imageset.set_beam(beam)
                imageset.set_detector(detector)
                imageset.set_goniometer(goniometer)
                imageset.set_scan(scan)
            elif isinstance(imageset, (ImageSet, ImageGrid)):
                for i in range(len(imageset)):
                    imageset.set_beam(beam, i)
                    imageset.set_detector(detector, i)
                    imageset.set_goniometer(goniometer, i)
                    imageset.set_scan(scan, i)

            imageset.update_detector_px_mm_data()

        return imageset

    def decode(self):
        """Decode the dictionary into a list of experiments."""
        # Extract all the experiments - first find all scans belonging to
        # same imageset

        eobj_scan = {}

        for eobj in self._obj["experiment"]:
            if self._lookup_model("imageset", eobj) is None:
                continue
            imageset_ref = eobj.get("imageset")
            scan = self._lookup_model("scan", eobj)

            if imageset_ref in eobj_scan:
                # if there is no scan, or scan is identical, move on, else
                # make a scan which encompasses both scans
                if not scan or scan == eobj_scan[imageset_ref]:
                    continue
                i = eobj_scan[imageset_ref].get_image_range()
                j = scan.get_image_range()
                if i[1] + 1 == j[0]:
                    eobj_scan[imageset_ref] += scan
                else:
                    # make a new bigger scan
                    o = eobj_scan[imageset_ref].get_oscillation()
                    s = scan.get_oscillation()
                    assert o[1] == s[1]
                    scan = copy.deepcopy(scan)
                    scan.set_image_range((min(i[0], j[0]), max(i[1], j[1])))
                    scan.set_oscillation((min(o[0], s[0]), o[1]))
                    eobj_scan[imageset_ref] = scan
            else:
                eobj_scan[imageset_ref] = copy.deepcopy(scan)

        # Map of imageset/scan pairs
        imagesets = {}

        # For every experiment, use the given input to create
        # a sensible experiment.
        el = ExperimentList()
        for eobj in self._obj["experiment"]:

            # Get the models
            identifier = eobj.get("identifier", "")
            beam = self._lookup_model("beam", eobj)
            detector = self._lookup_model("detector", eobj)
            goniometer = self._lookup_model("goniometer", eobj)
            scan = self._lookup_model("scan", eobj)
            crystal = self._lookup_model("crystal", eobj)
            profile = self._lookup_model("profile", eobj)
            scaling_model = self._lookup_model("scaling_model", eobj)

            models = {
                "beam": beam,
                "detector": detector,
                "goniometer": goniometer,
                "scan": scan,
                "crystal": crystal,
                "profile": profile,
                "scaling_model": scaling_model,
            }

            imageset_ref = eobj.get("imageset")

            # If not already cached, load this imageset
            if imageset_ref not in imagesets:
                imageset_data = self._lookup_model("imageset", eobj)
                if imageset_data is not None:
                    # Create the imageset from the input data
                    models["scan"] = eobj_scan[imageset_ref]
                    imageset = self._imageset_from_imageset_data(imageset_data, models)
                    imagesets[imageset_ref] = imageset
                else:
                    # Even if we have an empty entry, this counts as a load
                    imagesets[imageset_ref] = None

            # Append the experiment
            el.append(
                Experiment(
                    imageset=imagesets[imageset_ref],
                    beam=beam,
                    detector=detector,
                    goniometer=goniometer,
                    scan=scan,
                    crystal=crystal,
                    profile=profile,
                    scaling_model=scaling_model,
                    identifier=identifier,
                )
            )

        return el

    def _make_mem_imageset(self, imageset):
        """Can't make a mem imageset from dict."""
        return None

    def _make_stills(self, imageset, format_kwargs=None):
        """Make a still imageset."""
        filenames = [
            resolve_path(p, directory=self._directory) if not get_url_scheme(p) else p
            for p in imageset["images"]
        ]
        indices = None
        if "single_file_indices" in imageset:
            indices = imageset["single_file_indices"]
            assert len(indices) == len(filenames)
        return ImageSetFactory.make_imageset(
            filenames,
            None,
            check_format=self._check_format,
            single_file_indices=indices,
            format_kwargs=format_kwargs,
        )

    def _make_grid(self, imageset, format_kwargs=None):
        """Make a still imageset."""
        grid_size = imageset["grid_size"]
        return ImageGrid.from_imageset(
            self._make_stills(imageset, format_kwargs=format_kwargs), grid_size
        )

    def _make_sequence(
        self,
        imageset,
        beam=None,
        detector=None,
        goniometer=None,
        scan=None,
        format_kwargs=None,
    ):
        """Make an image sequence."""
        # Get the template format
        template = resolve_path(imageset["template"], directory=self._directory)

        # Get the number of images (if no scan is given we'll try
        # to find all the images matching the template
        if scan is None:
            i0, i1 = template_image_range(template)
        else:
            i0, i1 = scan.get_image_range()

        format_class = None
        if self._check_format is False:
            if "single_file_indices" in imageset:
                format_class = FormatMultiImage

        # Make a sequence from the input data
        return ImageSetFactory.make_sequence(
            template,
            list(range(i0, i1 + 1)),
            format_class=format_class,
            check_format=self._check_format,
            beam=beam,
            detector=detector,
            goniometer=goniometer,
            scan=scan,
            format_kwargs=format_kwargs,
        )

    def _lookup_model(self, name, experiment_dict):
        """
        Find a model by looking up its index from a dictionary

        Args:
            name (str): The model name e.g. 'beam', 'detector'
            experiment_dict (Dict[str, int]):
                The experiment dictionary. experiment_dict[name] must
                exist and be not None to retrieve a model. If this key
                exists, then there *must* be an item with this index
                in the ExperimentListDict internal model stores.

        Returns:
            Optional[Any]:
                A model by looking up the index pointed to by
                experiment_dict[name]. If not present or empty,
                then None is returned.
        """
        if experiment_dict.get(name) is None:
            return None
        return self._lookups[name][experiment_dict[name]]

    @staticmethod
    def _scaling_model_from_dict(obj):
        """Get the scaling model from a dictionary."""
        for entry_point in pkg_resources.iter_entry_points("dxtbx.scaling_model_ext"):
            if entry_point.name == obj["__id__"]:
                return entry_point.load().from_dict(obj)


def _experimentlist_from_file(filename, directory=None):
    """Load a model dictionary from a file."""
    filename = resolve_path(filename, directory=directory)
    try:
        with open(filename) as infile:
            return json.load(infile)
    except OSError:
        raise OSError("unable to read file, %s" % filename)


[docs]class ExperimentListFactory: """A class to help instantiate experiment lists."""
[docs] @staticmethod def from_args(args, unhandled=None): """Try to load serialised experiments from any recognised format.""" # Create a list for unhandled arguments if unhandled is None: unhandled = [] experiments = ExperimentList() # Try to load from serialized formats for filename in args: try: experiments.extend( ExperimentListFactory.from_serialized_format(filename) ) logger.debug(f"Loaded experiments from {filename}") except Exception as e: logger.debug(f"Could not load experiments from {filename}: {e}") unhandled.append(filename) return experiments
[docs] @staticmethod def from_filenames( filenames, unhandled=None, compare_beam=None, compare_detector=None, compare_goniometer=None, scan_tolerance=None, format_kwargs=None, load_models=True, ): """Create a list of data blocks from a list of directory or file names.""" experiments = ExperimentList() # Process each file given by this path list to_process = _openingpathiterator(filenames) find_format = dxtbx.datablock.FormatChecker() format_groups = collections.defaultdict(list) if format_kwargs is None: format_kwargs = {} for filename in to_process: # We now have a file, pre-opened by Format.open_file (therefore # cached). Determine its type, and prepare to put into a group format_class = find_format.find_format(filename) # Verify this makes sense if not format_class: # No format class found? logger.debug("Could not determine format for %s", filename) if unhandled is not None: unhandled.append(filename) elif format_class.is_abstract(): logger.debug( f"Image file {filename} appears to be a '{format_class.__name__}', but this is an abstract Format" ) # Invalid format class found? if unhandled is not None: unhandled.append(filename) elif issubclass(format_class, FormatMultiImage): imageset = format_class.get_imageset( os.path.abspath(filename), format_kwargs=format_kwargs ) format_groups[format_class].append(imageset) logger.debug("Loaded file: %s", filename) else: format_object = format_class(filename, **format_kwargs) meta = ImageMetadataRecord.from_format(format_object) assert meta.filename == filename # Add this entry to our table of formats format_groups[format_class].append(meta) logger.debug("Loaded metadata of file: %s", filename) # Now, build experiments from these files. Duplicating the logic of # the previous implementation: # - FormatMultiImage files each have their own ImageSet # - Every set of images forming a scan goes into its own ImageSequence # - Any consecutive still frames that share any metadata with the # previous still fram get collected into one ImageSet # Treat each format as a separate datablock for format_class, records in format_groups.items(): if issubclass(format_class, FormatMultiImage): for imageset in records: experiments.extend( ExperimentListFactory.from_imageset_and_crystal( imageset, crystal=None, load_models=load_models ) ) continue # Merge any consecutive and identical metadata together _merge_model_metadata( records, compare_beam=compare_beam, compare_detector=compare_detector, compare_goniometer=compare_goniometer, ) records = _merge_scans(records, scan_tolerance=scan_tolerance) imagesets = _convert_to_imagesets(records, format_class, format_kwargs) imagesets = list(imagesets) # Validate this datablock and store it assert imagesets, "Datablock got no imagesets?" for imageset in imagesets: experiments.extend( ExperimentListFactory.from_imageset_and_crystal( imageset, crystal=None, load_models=load_models ) ) return experiments
[docs] @staticmethod def from_imageset_and_crystal(imageset, crystal, load_models=True): """Load an experiment list from an imageset and crystal.""" if isinstance(imageset, ImageSequence): return ExperimentListFactory.from_sequence_and_crystal( imageset, crystal, load_models ) else: return ExperimentListFactory.from_stills_and_crystal( imageset, crystal, load_models )
[docs] @staticmethod def from_sequence_and_crystal(imageset, crystal, load_models=True): """Create an experiment list from sequence and crystal.""" assert isinstance(imageset, ImageSequence) experiments = ExperimentList() if load_models: # if imagesequence is still images, make one experiment for each # all referencing into the same image set if imageset.get_scan().is_still(): start, end = imageset.get_scan().get_array_range() for j in range(start, end): subset = imageset[j : j + 1] experiments.append( Experiment( imageset=imageset, beam=imageset.get_beam(), detector=imageset.get_detector(), goniometer=imageset.get_goniometer(), scan=subset.get_scan(), crystal=crystal, ) ) else: experiments.append( Experiment( imageset=imageset, beam=imageset.get_beam(), detector=imageset.get_detector(), goniometer=imageset.get_goniometer(), scan=imageset.get_scan(), crystal=crystal, ) ) return experiments else: return ExperimentList([Experiment(imageset=imageset, crystal=crystal)])
[docs] @staticmethod def from_stills_and_crystal(imageset, crystal, load_models=True): """Create an experiment list from stills and crystal.""" experiments = ExperimentList() if load_models: for i in range(len(imageset)): experiments.append( Experiment( imageset=imageset[i : i + 1], beam=imageset.get_beam(i), detector=imageset.get_detector(i), goniometer=imageset.get_goniometer(i), scan=imageset.get_scan(i), crystal=crystal, ) ) else: for i in range(len(imageset)): experiments.append( Experiment(imageset=imageset[i : i + 1], crystal=crystal) ) return experiments
[docs] @staticmethod def from_datablock_and_crystal(datablock, crystal, load_models=True): """Load an experiment list from a datablock.""" # Initialise the experiment list experiments = ExperimentList() # If we have a list, loop through if isinstance(datablock, list): for db in datablock: experiments.extend( ExperimentListFactory.from_datablock_and_crystal( db, crystal, load_models ) ) return experiments # Add all the imagesets for imageset in datablock.extract_imagesets(): experiments.extend( ExperimentListFactory.from_imageset_and_crystal( imageset, crystal, load_models ) ) # Check the list is consistent assert experiments.is_consistent() return experiments
[docs] @staticmethod def from_dict(obj, check_format=True, directory=None): """ Load an experiment list from a dictionary. Args: obj (dict): Dictionary containing either ExperimentList or DataBlock structure. check_format (bool): If True, the file will be read to verify metadata. directory (str): Returns: ExperimentList: The dictionary converted """ try: experiments = ExperimentList() for db in DataBlockFactory.from_dict( obj, check_format=check_format, directory=directory ): experiments.extend( ExperimentListFactory.from_datablock_and_crystal(db, None) ) except Exception: experiments = None # Decode the experiments from the dictionary if experiments is None: experiments = ExperimentListDict( obj, check_format=check_format, directory=directory ).decode() # Check the list is consistent assert experiments.is_consistent() return experiments
[docs] @staticmethod def from_json(text, check_format=True, directory=None): """Load an experiment list from JSON.""" return ExperimentListFactory.from_dict( json.loads(text), check_format=check_format, directory=directory, )
[docs] @staticmethod def from_json_file(filename, check_format=True): """Load an experiment list from a json file.""" filename = os.path.abspath(filename) directory = os.path.dirname(filename) with open(filename) as infile: return ExperimentListFactory.from_json( infile.read(), check_format=check_format, directory=directory )
[docs] @staticmethod def from_pickle_file(filename): """Decode an experiment list from a pickle file.""" with open(filename, "rb") as infile: obj = pickle.load(infile) assert isinstance(obj, ExperimentList) return obj
[docs] @staticmethod def from_xds(xds_inp, xds_other): """Generate an experiment list from XDS files.""" # Get the sequence from the XDS files sequence = xds.to_imageset(xds_inp, xds_other) # Get the crystal from the XDS files crystal = xds.to_crystal(xds_other) # Create the experiment list experiments = ExperimentListFactory.from_imageset_and_crystal(sequence, crystal) # Set the crystal in the experiment list assert len(experiments) == 1 return experiments
[docs] @staticmethod def from_serialized_format(filename, check_format=True): """Try to load the experiment list from a serialized format.""" if hasattr(filename, "__fspath__"): filename = filename.__fspath__() # unwrap PEP-519-style objects return ExperimentListFactory.from_json_file(filename, check_format)
[docs] @staticmethod def from_templates(templates, **kwargs): """Import an experiment list from templates""" assert "verbose" not in kwargs, "The verbose parameter has been removed" assert len(templates) > 0 experiments = ExperimentList() find_format = dxtbx.datablock.FormatChecker() # For each template do an import for template in templates: template = os.path.normpath(template) filenames = sorted(locate_files_matching_template_string(template)) if len(filenames): logger.debug( "The following files matched the template string:\n%s", "\n".join(f" {p}" for p in filenames), ) # Check if we've matched any filenames if len(filenames) == 0: raise ValueError(f"Template '{template}' does not match any files") # Get the format from the first image format_class = find_format.find_format(filenames[0]) # Verify this makes sense if format_class is None: raise ValueError(f"Image file {filenames[0]} format is unknown") elif format_class.is_abstract(): raise ValueError( f"Image file {filenames[0]} appears to be a '{type(format_class).__name__}', but this is an abstract Format" ) else: index = slice(*template_string_number_index(template)) image_range = kwargs.get("image_range") if image_range: first, last = image_range else: first, last = template_image_range(template) if not kwargs.get("allow_incomplete_sequences", False): if "#" in template: # Check all images in range are present - if allowed all_numbers = {int(f[index]) for f in filenames} missing = set(range(first, last + 1)) - all_numbers if missing: raise ValueError( "Missing image{} {} from imageset ({}-{})".format( "s" if len(missing) > 1 else "", ", ".join(str(x) for x in sorted(missing)), first, last, ) ) else: print( "Warning: Using only one template file: %s. \n " "`allow_incomplete_sequence` has no effect" % template ) # Read the image fmt = format_class(filenames[0], **(kwargs.get("format_kwargs", {}))) # Update the image range image_range = (first, last) scan = fmt.get_scan() scan.set_image_range(image_range) # Create the sequence and experiment imageset = dxtbx.imageset.ImageSetFactory.make_sequence( template, list(range(first, last + 1)), format_class, fmt.get_beam(), fmt.get_detector(), fmt.get_goniometer(), scan, format_kwargs=kwargs.get("format_kwargs"), ) experiments.extend( ExperimentListFactory.from_imageset_and_crystal( imageset, crystal=None, load_models=True, ) ) return experiments
class ImageMetadataRecord: """Object to store metadata information. This is used whilst building the datablocks. The metadata for each image can be read once, and then any grouping/deduplication can happen later, without re-opening the original file. """ def __init__( self, beam=None, detector=None, goniometer=None, scan=None, template=None, filename=None, index=None, ): # type: (dxtbx.model.Beam, dxtbx.model.Detector, dxtbx.model.Goniometer, dxtbx.model.Scan, str, str, int) """ Args: beam: Stores a beam model detector: Stores a detector model goniometer: Stores a goniometer model scan: Stores a scan model filename: The filename this record was parsed from template: The template string parsed from the filename. Usually, the template is only present if a scan was found and oscillation width was nonzero. index: The index of this file in the template. Applying the index to the template field should recover the filename """ self.beam = beam self.detector = detector self.goniometer = goniometer self.scan = scan self.template = template self.filename = filename self.index = index def merge_metadata_from( self, other_record, compare_beam=operator.__eq__, compare_detector=operator.__eq__, compare_goniometer=operator.__eq__, ): # type: (ImageMetadataRecord, Callable, Callable, Callable) -> bool """ Compare two record objects and merge equivalent data. This method will compare (with optional functions) instance data for beam, detector and goniometer. If any of the metadata for this record is equivalent to (but a different instance from) the other record, then this instance will be altered to match the other. The function used to compare beams, detectors and goniometers can be customised - but by default the normal equality operator is used. Args: other_record: Another metadata instance compare_beam: A function to compare beams compare_detector: A function to compare detectors compare_goniometer: A function to compare goniometers Returns: True if any action was taken """ # Allow 'defaults' of None to work - behavior from legacy implementation compare_beam = compare_beam or operator.__eq__ compare_detector = compare_detector or operator.__eq__ compare_goniometer = compare_goniometer or operator.__eq__ record_altered = False if self.beam is not other_record.beam and compare_beam( self.beam, other_record.beam ): self.beam = other_record.beam record_altered = True if self.detector is not other_record.detector and compare_detector( self.detector, other_record.detector ): self.detector = other_record.detector record_altered = True if self.goniometer is not other_record.goniometer and compare_goniometer( self.goniometer, other_record.goniometer ): self.goniometer = other_record.goniometer record_altered = True return record_altered @classmethod def from_format(cls, fmt): # type: (Format) -> Any """ Read metadata information from a Format instance. This will only pull information out of a single format instance while it is open - combining metadata records must be done separately. Args: format: The instance of the format class to read data from Returns: A new ImageMetadataRecord with the pre-read information """ record = cls() record.filename = fmt.get_image_file() # Get the metadata from the format try: record.beam = fmt.get_beam() except Exception: pass try: record.detector = fmt.get_detector() except Exception: pass try: record.goniometer = fmt.get_goniometer() except Exception: pass try: record.scan = fmt.get_scan() except Exception: pass # Get the template and index if possible - and only if we've got a # recorded oscillation value if record.scan is not None: record.template, record.index = template_regex(record.filename) return record def __repr__(self): items = [ ("filename", self.filename), ("beam", self.beam), ("detector", self.detector), ("goiometer", self.goniometer), ("scan", self.scan), ("template", self.template), ("index", self.index), ] itemstr = ", ".join(x + "=" + repr(y) for x, y in items) return "<{}{}{}>".format(type(self).__name__, " " if itemstr else "", itemstr) def __hash__(self): return hash( ( self.beam, self.detector, self.goniometer, self.scan, self.template, self.filename, self.index, ) ) def __eq__(self, other): if not isinstance(other, ImageMetadataRecord): return False return all( getattr(self, attribute) == getattr(other, attribute) for attribute in ( "beam", "detector", "goniometer", "scan", "template", "filename", "index", ) ) def __ne__(self, other): return not self == other def _iterate_with_previous(iterable): """Convenience iterator to give pairs of (previous, next) items""" previous = None for val in iterable: yield (previous, val) previous = val def _groupby_template_is_none(records): # type: (Iterable[ImageMetadataRecord]) -> Generator[List[ImageMetadataRecord]] """Specialization of groupby that groups records by format=None""" for _, group in itertools.groupby( enumerate(records), key=lambda x: -1 if x[1].template is None else x[0] ): yield list(x[1] for x in group) def _openingpathiterator(pathnames: Iterable[str]): """Utility function to efficiently open all paths. A path is a potential file or directory. Each path will be opened with :meth:`dxtbx.format.Format.open_file`, but in order to do so each file will only be opened once, and extraneous use of :func:`os.stat` will be avoided. Any path entries that are a directory will be recursed into, once - any further directories found will be ignored. Any path that is not a file or directory, or on which IO fails for any reason, will still be returned. Args: pathnames: Paths to attempt to open """ # Store a tuple of (recurse, pathname) to track what was root level paths = collections.deque((True, x) for x in sorted(pathnames)) while paths: # Get the next path from the queue (do_recurse, pathname) = paths.popleft() pathname = os.fspath(pathname) try: # Attempt to open this 'path' Format.open_file(pathname) except OSError as e: if e.errno == errno.EISDIR: if do_recurse: # We've tried to open a directory. Get all the entries... subdir_paths = sorted( os.path.join(pathname, x) for x in os.listdir(pathname) ) # ... and add them to our queue. Make sure not to mark for recursion paths.extendleft((False, x) for x in reversed(subdir_paths)) logger.debug("Adding %d files from %s", len(subdir_paths), pathname) else: logger.debug("Not adding sub-level directory entry %s", pathname) # Don't return directory instances continue else: # A non-directory-related IO error logger.debug("Could not import %s: %s", pathname, os.strerror(e.errno)) yield pathname def _merge_model_metadata( records, compare_beam=None, compare_detector=None, compare_goniometer=None ): # type: (Iterable[ImageMetadataRecord], Callable, Callable, Callable) """ Merge metadata between consecutive record objects. This will compare each record with the previous one, and make sure the metadata instances are shared where appropriate. Args: records: Records for the images to merge into imagesets compare_beam: The function to to compare beams compare_detector: The function to compare detectors compare_goniometer: The function to compare goniometers """ for prev, record in _iterate_with_previous(records): if prev is None: continue record.merge_metadata_from( prev, compare_beam=compare_beam, compare_detector=compare_detector, compare_goniometer=compare_goniometer, ) def _merge_scans(records, scan_tolerance=None): # type: (Iterable[ImageMetadataRecord], float) -> List[ImageMetadataRecord] """ Merge consecutive scan records with identical metadata. The records should have previously had their model metadata merged, as identity will be used to compare metadata identity at this stage. Args: records: Records to merge scan_tolerance: Fraction of oscillation range to tolerate when merging scan records Returns: A (potentially shorter) list of records with scans merged """ merged_records = [] logger.debug("Merging scans") for prev, record in _iterate_with_previous(records): # The first record always gets recorded if prev is None: merged_records.append(record) logger.debug(" Saving initial record %s", record) continue # Compare metadata instances same_metadata = [ prev.beam is record.beam, prev.detector is record.detector, prev.goniometer is record.goniometer, ] # Condition for combining: # - All metadata must match # - Previous record must be templated # - This record must be templated if ( all(same_metadata) and prev.template is not None and record.template is not None ): # Attempt to append to scan try: if scan_tolerance is None: prev.scan.append(record.scan) else: prev.scan.append(record.scan, scan_tolerance=scan_tolerance) except RuntimeError as e: print(e) logger.debug( " Failed to merge record %s with previous - writing new scan" ) else: # If we appended, then we don't need to keep this record's scan record.scan = prev.scan logger.debug(" Appended record %s to previous", record) continue merged_records.append(record) logger.debug("Result of merging record scans: %d records", len(merged_records)) return merged_records def _convert_to_imagesets(records, format_class, format_kwargs=None): # type: (Iterable[ImageMetadataRecord], Type[dxtbx.format.Format], Dict) -> Generator[dxtbx.imageset.ImageSet] """ Convert records into imagesets. The records should have been metadata- and scan-merged by this point. Rules: - Any groups of template=None where any of the metadata objects are shared, go into a single imageset - Anything with a template goes into a single sequence Args: records: The records to convert format_class: The format class for the data in this record format_kwargs: Any format configuration arguments to pass to the format imageset creator. Returns: Imagesets representing the records """ # Iterate over images/sets such that template=None are clustered for setgroup in _groupby_template_is_none(records): if setgroup[0].template is not None: # If we have a template, then it's a sequence assert len(setgroup) == 1, "Got group of metadata records in template?" logger.debug("Creating Imagesequence from %s", setgroup[0].template) yield _create_imagesequence(setgroup[0], format_class, format_kwargs) else: # Without a template, it was never identified as a sequence, so an imageset logger.debug("Creating ImageSet from %d files", len(setgroup)) yield _create_imageset(setgroup, format_class, format_kwargs) def _create_imageset(records, format_class, format_kwargs=None): # type: (Iterable[ImageMetadataRecord], Type[dxtbx.format.Format], Dict) -> dxtbx.imageset.ImageSet """ Create an ImageSet object from a set of single-image records. Args: records: Single-image metadata records to merge into a single imageset format_class: The format class object for these image records format_kwargs: Extra arguments to pass to the format class when creating an ImageSet Returns: An imageset for all the image records """ records = list(records) # Nothing here should have been assigned a template parameter assert all(x.template is None for x in records) # Extract the filenames from the records filenames = [ x.filename if get_url_scheme(x.filename) else os.path.abspath(x.filename) for x in records ] # Create the imageset imageset = dxtbx.imageset.ImageSetFactory.make_imageset( filenames, format_class, format_kwargs=format_kwargs, check_format=False ) # Update all of the metadata for each record for i, r in enumerate(records): imageset.set_beam(r.beam, i) imageset.set_detector(r.detector, i) imageset.set_goniometer(r.goniometer, i) imageset.set_scan(r.scan, i) return imageset def _create_imagesequence(record, format_class, format_kwargs=None): # type: (ImageMetadataRecord, Type[Format], Dict) -> dxtbx.imageset.ImageSequence """ Create an ImageSequence object from a single rotation data image. Args: record: Single-image metadata records to merge into a single imageset format_class: The format class object for these image records format_kwargs: Extra arguments to pass to the format class when creating an ImageSet Returns: An imageset representing the sequence of data """ index_start, index_end = record.scan.get_image_range() # Create the sequence sequence = dxtbx.imageset.ImageSetFactory.make_sequence( template=os.path.abspath(record.template), indices=list(range(index_start, index_end + 1)), format_class=format_class, beam=record.beam, detector=record.detector, goniometer=record.goniometer, scan=record.scan, format_kwargs=format_kwargs, # check_format=False, ) return sequence