Source code for defdap.file_readers

# Copyright 2023 Mechanics of Microstructures Group
#    at The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured
import pandas as pd
from abc import ABC, abstractmethod
import pathlib
import re

from typing import TextIO, Dict, List, Callable, Any, Type, Optional

from defdap.crystal import Phase
from defdap.quat import Quat
from defdap.utils import Datastore


[docs]class EBSDDataLoader(ABC): """Class containing methods for loading and checking EBSD data """ def __init__(self) -> None: # required metadata self.loaded_metadata = { 'shape': (0, 0), 'step_size': 0., 'acquisition_rotation': Quat(1.0, 0.0, 0.0, 0.0), 'phases': [], 'edx': {'Count': 0}, } # required data self.loaded_data = Datastore() self.loaded_data.add( 'phase', None, unit='', type='map', order=0, comment='1-based, 0 is non-indexed points', plot_params={ 'vmin': 0, } ) self.loaded_data.add( 'euler_angle', None, unit='rad', type='map', order=1, default_component='all_euler' ) self.data_format = None
[docs] @staticmethod def get_loader(data_type: str, file_name: pathlib.Path) -> 'Type[EBSDDataLoader]': if data_type is None: data_type = { '.crc': 'oxfordbinary', '.cpr': 'oxfordbinary', '.ctf': 'oxfordtext', '.ang': 'edaxang', }.get(file_name.suffix, 'oxfordbinary') data_type = data_type.lower() try: loader = { 'oxfordbinary': OxfordBinaryLoader, 'oxfordtext': OxfordTextLoader, 'edaxang': EdaxAngLoader, 'pythondict': PythonDictLoader, }[data_type] except KeyError: raise ValueError(f"No loader for EBSD data of type {data_type}.") return loader()
[docs] def check_metadata(self) -> None: """ Checks that the number of phases from metadata matches the amount of phases loaded. """ for phase in self.loaded_metadata['phases']: assert type(phase) is Phase
[docs] def check_data(self) -> None: shape = self.loaded_metadata['shape'] assert self.loaded_data.phase.shape == shape assert self.loaded_data.euler_angle.shape == (3,) + shape
# assert self.loaded_data['bandContrast'].shape == mapShape
[docs] @abstractmethod def load(self, file_name: pathlib.Path) -> None: pass
[docs]class OxfordTextLoader(EBSDDataLoader):
[docs] def load(self, file_name: pathlib.Path) -> None: """ Read an Oxford Instruments .ctf file, which is a HKL single orientation file. Parameters ---------- file_name Path to file """ # open data file and read in metadata if not file_name.is_file(): raise FileNotFoundError(f"Cannot open file {file_name}") def parse_phase() -> Phase: line_split = line.split('\t') dims = line_split[0].split(';') dims = tuple(round(float(s), 3) for s in dims) angles = line_split[1].split(';') angles = tuple(round(float(s), 3) * np.pi / 180 for s in angles) lattice_params = dims + angles phase = Phase( line_split[2], int(line_split[3]), int(line_split[4]), lattice_params ) return phase # default values for acquisition rotation in case missing in in file acq_eulers = [0., 0., 0.] with open(str(file_name), 'r') as ctf_file: for i, line in enumerate(ctf_file): if 'XCells' in line: x_dim = int(line.split()[-1]) elif 'YCells' in line: y_dim = int(line.split()[-1]) elif 'XStep' in line: self.loaded_metadata['step_size'] = float(line.split()[-1]) elif 'AcqE1' in line: acq_eulers[0] = float(line.split()[-1]) elif 'AcqE2' in line: acq_eulers[1] = float(line.split()[-1]) elif 'AcqE3' in line: acq_eulers[2] = float(line.split()[-1]) elif 'Phases' in line: num_phases = int(line.split()[-1]) self.loaded_data['phase', 'plot_params']['vmax'] = num_phases for j in range(num_phases): line = next(ctf_file) self.loaded_metadata['phases'].append(parse_phase()) # phases are last in the header, so read the column # headings then break out the loop header_text = next(ctf_file) num_header_lines = i + j + 3 break shape = (y_dim, x_dim) self.loaded_metadata['shape'] = shape self.loaded_metadata['acquisition_rotation'] = Quat.from_euler_angles( *(np.array(acq_eulers) * np.pi / 180) ) self.check_metadata() # Construct data format from table header field_lookup = { 'Phase': ('phase', 'uint8'), 'X': ('x', 'float32'), 'Y': ('y', 'float32'), 'Bands': ('numBands', 'uint8'), 'Error': ('error', 'uint8'), 'Euler1': ('ph1', 'float32'), 'Euler2': ('phi', 'float32'), 'Euler3': ('ph2', 'float32'), 'MAD': ('MAD', 'float32'), # Mean Angular Deviation 'BC': ('BC', 'uint8'), # Band Contrast 'BS': ('BS', 'uint8'), # Band Slope } keep_col_names = ('phase', 'ph1', 'phi', 'ph2', 'BC', 'BS', 'MAD') data_format = [] load_cols = [] try: for i, col_title in enumerate(header_text.split()): if field_lookup[col_title][0] in keep_col_names: data_format.append(field_lookup[col_title]) load_cols.append(i) except KeyError: raise TypeError("Unknown data in EBSD file.") self.data_format = np.dtype(data_format) # now read the data from file data = np.loadtxt( str(file_name), dtype=self.data_format, usecols=load_cols, skiprows=num_header_lines ) self.loaded_data.add( 'band_contrast', data['BC'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'cmap': 'gray', 'clabel': 'Band contrast', } ) self.loaded_data.add( 'band_slope', data['BS'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'cmap': 'gray', 'clabel': 'Band slope', } ) self.loaded_data.add( 'mean_angular_deviation', data['MAD'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': 'Mean angular deviation', } ) self.loaded_data.phase = data['phase'].reshape(shape) euler_angle = structured_to_unstructured( data[['ph1', 'phi', 'ph2']].reshape(shape)).transpose((2, 0, 1)) euler_angle *= np.pi / 180 self.loaded_data.euler_angle = euler_angle self.check_data()
[docs]class EdaxAngLoader(EBSDDataLoader):
[docs] def load(self, file_name: pathlib.Path) -> None: """ Read an EDAX .ang file. Parameters ---------- file_name Path to file """ # open data file and read in metadata if not file_name.is_file(): raise FileNotFoundError(f"Cannot open file {file_name}") i_phase = 1 # parse header lines (starting with #) with open(str(file_name), 'r') as ang_file: while True: line = ang_file.readline() if not line.startswith('#'): # end of header break # remove # line = line[1:].strip() if line.startswith('Phase'): if int(line.split()[1]) != i_phase: raise ValueError('Phases not sequential in file?') phase_lines = read_until_string( ang_file, '#', exact=True, line_process=lambda l: l[1:].strip() ) self.loaded_metadata['phases'].append( EdaxAngLoader.parse_phase(phase_lines) ) i_phase += 1 elif line.startswith('GRID'): if line.split()[-1] != 'SqrGrid': raise ValueError('Only square grids supported') elif line.startswith('XSTEP'): self.loaded_metadata['step_size'] = float(line.split()[-1]) elif line.startswith('NCOLS_ODD'): xdim = int(line.split()[-1]) elif line.startswith('NROWS'): ydim = int(line.split()[-1]) shape = (ydim, xdim) self.loaded_metadata['shape'] = shape self.check_metadata() # Construct fixed data format self.data_format = np.dtype([ ('ph1', 'float32'), ('phi', 'float32'), ('ph2', 'float32'), # ('x', 'float32'), # ('y', 'float32'), ('IQ', 'float32'), ('CI', 'float32'), ('phase', 'uint8'), # ('SE_signal', 'float32'), ('FF', 'float32'), ]) load_cols = (0, 1, 2, 5, 6, 7, 8, 9) # now read the data from file data = np.loadtxt( str(file_name), dtype=self.data_format, comments='#', usecols=load_cols ) self.loaded_data.add( 'image_quality', data['IQ'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': 'Image quality', } ) self.loaded_data.add( 'confidence_index', data['CI'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': 'Confidence index', } ) self.loaded_data.add( 'fit_factor', data['FF'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': 'Fit factor', } ) self.loaded_data.phase = data['phase'].reshape(shape) + 1 self.loaded_data['phase', 'plot_params']['vmax'] = len(self.loaded_metadata['phases']) # flatten the structured dtype euler_angle = structured_to_unstructured( data[['ph1', 'phi', 'ph2']].reshape(shape)).transpose((2, 0, 1)) euler_angle[0] -= np.pi / 2 euler_angle[0, euler_angle[0] < 0.] += 2 * np.pi self.loaded_data.euler_angle = euler_angle self.check_data()
[docs] @staticmethod def parse_phase(lines) -> Phase: for line in lines: line = line.split() if line[0] == 'MaterialName': name = line[1] if line[0] == 'Symmetry': point_group = line[1] if point_group in ('43', 'm3m'): # cubic high laue_group = 11 # can't determine but set to BCC for now space_group = 229 elif point_group == '6/mmm': # hex high laue_group = 9 space_group = None else: raise ValueError(f'Unknown crystal symmetry {point_group}') elif line[0] == 'LatticeConstants': dims = line[1:4] dims = tuple(round(float(s), 3) for s in dims) angles = line[4:7] angles = tuple(round(float(s), 3) * np.pi / 180 for s in angles) lattice_params = dims + angles return Phase(name, laue_group, space_group, lattice_params)
[docs]class OxfordBinaryLoader(EBSDDataLoader):
[docs] def load(self, file_name: pathlib.Path) -> None: """Read Oxford Instruments .cpr/.crc file pair. Parameters ---------- file_name Path to file """ self.load_oxford_cpr(file_name) self.load_oxford_crc(file_name)
[docs] def load_oxford_cpr(self, file_name: pathlib.Path) -> None: """ Read an Oxford Instruments .cpr file, which is a metadata file describing EBSD data. Parameters ---------- file_name Path to file """ comment_char = ';' file_name = file_name.with_suffix('.cpr') if not file_name.is_file(): raise FileNotFoundError("Cannot open file {}".format(file_name)) # CPR file is split into groups, load each group into a # hierarchical dict metadata = dict() group_pat = re.compile(r"\[(.+)\]") def parse_line(line: str, group_dict: Dict) -> None: try: key, val = line.strip().split('=') group_dict[key] = val except ValueError: pass with open(str(file_name), 'r') as cpr_file: while True: line = cpr_file.readline() if not line: # End of file break if line.strip() == '' or line.strip()[0] == comment_char: # Skip comment or empty line continue group_name = group_pat.match(line.strip()).group(1) group_dict = dict() read_until_string(cpr_file, '[', comment_char=comment_char, line_process=lambda l: parse_line(l, group_dict)) metadata[group_name] = group_dict # Create phase objects and move metadata to object metadata dict x_dim = int(metadata['Job']['xCells']) y_dim = int(metadata['Job']['yCells']) self.loaded_metadata['shape'] = (y_dim, x_dim) self.loaded_metadata['step_size'] = float(metadata['Job']['GridDistX']) self.loaded_metadata['acquisition_rotation'] = Quat.from_euler_angles( float(metadata['Acquisition Surface']['Euler1']) * np.pi / 180., float(metadata['Acquisition Surface']['Euler2']) * np.pi / 180., float(metadata['Acquisition Surface']['Euler3']) * np.pi / 180. ) num_phases = int(metadata['Phases']['Count']) for i in range(num_phases): phase_metadata = metadata['Phase{:}'.format(i + 1)] self.loaded_metadata['phases'].append(Phase( phase_metadata['StructureName'], int(phase_metadata['LaueGroup']), int(phase_metadata.get('SpaceGroup', 0)), ( round(float(phase_metadata['a']), 3), round(float(phase_metadata['b']), 3), round(float(phase_metadata['c']), 3), round(float(phase_metadata['alpha']), 3) * np.pi / 180, round(float(phase_metadata['beta']), 3) * np.pi / 180, round(float(phase_metadata['gamma']), 3) * np.pi / 180 ) )) self.loaded_data['phase', 'plot_params']['vmax'] = num_phases # Deal with EDX data edx_fields = {} if 'EDX Windows' in metadata: self.loaded_metadata['edx'] = metadata['EDX Windows'] count = int(self.loaded_metadata['edx']['Count']) self.loaded_metadata['edx']['Count'] = count for i in range(1, count + 1): name = self.loaded_metadata['edx'][f"Window{i}"] edx_fields[100+i] = (f'EDX {name}', 'float32') self.check_metadata() # Construct binary data format from listed fields unknown_field_count = 0 data_format = [('phase', 'uint8')] field_lookup = { 3: ('ph1', 'float32'), 4: ('phi', 'float32'), 5: ('ph2', 'float32'), 6: ('MAD', 'float32'), # Mean Angular Deviation 7: ('BC', 'uint8'), # Band Contrast 8: ('BS', 'uint8'), # Band Slope 10: ('numBands', 'uint8'), 11: ('AFI', 'uint8'), # Advanced Fit index. legacy 12: ('IB6', 'float32') # ? } field_lookup.update(edx_fields) try: for i in range(int(metadata['Fields']['Count'])): field_id = int(metadata['Fields']['Field{:}'.format(i + 1)]) data_format.append(field_lookup[field_id]) except KeyError: print(f'\nUnknown field in file with key {field_id}. ' f'Assuming float32 data.') unknown_field_count += 1 data_format.append((f'unknown_{unknown_field_count}', 'float32')) self.data_format = np.dtype(data_format)
[docs] def load_oxford_crc(self, file_name: pathlib.Path) -> None: """Read binary EBSD data from an Oxford Instruments .crc file Parameters ---------- file_name Path to file """ shape = self.loaded_metadata['shape'] file_name = file_name.with_suffix('.crc') if not file_name.is_file(): raise FileNotFoundError("Cannot open file {}".format(file_name)) # load binary data from file data = np.fromfile(str(file_name), self.data_format, count=-1) self.loaded_data.add( 'band_contrast', data['BC'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'cmap': 'gray', 'clabel': 'Band contrast', } ) self.loaded_data.add( 'band_slope', data['BS'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'cmap': 'gray', 'clabel': 'Band slope', } ) self.loaded_data.add( 'mean_angular_deviation', data['MAD'].reshape(shape), unit='', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': 'Mean angular deviation', } ) self.loaded_data.phase = data['phase'].reshape(shape) # flatten the structured dtype self.loaded_data.euler_angle = structured_to_unstructured( data[['ph1', 'phi', 'ph2']].reshape(shape)).transpose((2, 0, 1)) if self.loaded_metadata['edx']['Count'] > 0: EDXFields = [key for key in data.dtype.fields.keys() if key.startswith('EDX')] for field in EDXFields: self.loaded_data.add( field, data[field].reshape(shape), unit='counts', type='map', order=0, plot_params={ 'plot_colour_bar': True, 'clabel': field + ' counts', } ) self.check_data()
[docs]class PythonDictLoader(EBSDDataLoader):
[docs] def load(self, data_dict: Dict[str, Any]) -> None: """Construct EBSD data from a python dictionary. Parameters ---------- data_dict Dictionary with keys: 'step_size' 'phases' 'phase' 'euler_angle' 'band_contrast' """ self.loaded_metadata['shape'] = data_dict['phase'].shape self.loaded_metadata['step_size'] = data_dict['step_size'] assert type(data_dict['phases']) is list self.loaded_metadata['phases'] = data_dict['phases'] self.check_metadata() self.loaded_data.add( 'band_contrast', data_dict['band_contrast'], unit='', type='map', order=0 ) self.loaded_data.phase = data_dict['phase'] self.loaded_data['phase', 'plot_params']['vmax'] = len(self.loaded_metadata['phases']) self.loaded_data.euler_angle = data_dict['euler_angle'] self.check_data()
[docs]class DICDataLoader(ABC): """Class containing methods for loading and checking HRDIC data """ def __init__(self) -> None: self.loaded_metadata = { 'format': '', 'version': '', 'binning': '', 'shape': (0, 0), } # required data self.loaded_data = Datastore() self.loaded_data.add( 'coordinate', None, unit='px', type='map', order=1, default_component='magnitude', plot_params={ 'plot_colour_bar': True, 'clabel': 'Coordinate', } ) self.loaded_data.add( 'displacement', None, unit='px', type='map', order=1, default_component='magnitude', plot_params={ 'plot_colour_bar': True, 'clabel': 'Displacement', } )
[docs] @staticmethod def get_loader(data_type: str) -> 'Type[DICDataLoader]': if data_type is None: data_type = "Davis" data_type = data_type.lower() try: loader = { 'davis': DavisLoader, 'openpiv': OpenPivLoader, }[data_type] except KeyError: raise ValueError(f"No loader for DIC data of type {data_type}.") return loader()
[docs] def checkMetadata(self) -> None: return
[docs] def check_data(self) -> None: """ Calculate size of map from loaded data and check it matches values from metadata. """ # check binning binning = self.loaded_metadata['binning'] binning_x = min(abs(np.diff(self.loaded_data.coordinate[0].flat))) binning_y = max(abs(np.diff(self.loaded_data.coordinate[1].flat))) if not (binning_x == binning_y == binning): raise ValueError( f'Binning of data and header do not match `{binning_x}`, ' f'`{binning_y}`, `{binning}`' ) # check shape coord = self.loaded_data.coordinate shape = (coord.max(axis=(1, 2)) - coord.min(axis=(1, 2))) / binning + 1 shape = tuple(shape[::-1].astype(int)) if shape != self.loaded_metadata['shape']: raise ValueError( f'Dimensions of data and header do not match `{shape}, ' f'`{self.loaded_metadata["shape"]}`' )
[docs] @abstractmethod def load(self, file_name: pathlib.Path) -> None: pass
[docs]class DavisLoader(DICDataLoader):
[docs] def load(self, file_name: pathlib.Path) -> None: """ Load from Davis .txt file. Parameters ---------- file_name Path to file """ if not file_name.is_file(): raise FileNotFoundError("Cannot open file {}".format(file_name)) with open(str(file_name), 'r') as f: header = f.readline() metadata = header.split() # Software name and version self.loaded_metadata['format'] = metadata[0].strip('#') self.loaded_metadata['version'] = metadata[1] # Sub-window width in pixels self.loaded_metadata['binning'] = int(metadata[3]) # shape of map (from header) self.loaded_metadata['shape'] = (int(metadata[4]), int(metadata[5])) self.checkMetadata() data = pd.read_table(str(file_name), delimiter='\t', skiprows=1, header=None).values data = data.reshape(self.loaded_metadata['shape'] + (-1,)) data = data.transpose((2, 0, 1)) self.loaded_data.coordinate = data[:2] self.loaded_data.displacement = data[2:] self.check_data()
[docs] @staticmethod def load_davis_image_data(file_name: pathlib.Path) -> np.ndarray: """ A .txt file from DaVis containing a 2D image Parameters ---------- file_name Path to file Returns ------- np.ndarray Array of data. """ if not file_name.is_file(): raise FileNotFoundError("Cannot open file {}".format(file_name)) data = pd.read_table(str(file_name), delimiter='\t', skiprows=1, header=None) return np.array(data)
[docs]class OpenPivLoader(DICDataLoader):
[docs] def load(self, file_name: pathlib.Path) -> None: """ Load from Open PIV .txt file. Parameters ---------- file_name Path to file """ if not file_name.is_file(): raise FileNotFoundError(f"Cannot open file {file_name}") with open(str(file_name), 'r') as f: header = f.readline()[1:].split() data = np.loadtxt(f) col = { 'x': 0, 'y': 1, 'u': 2, 'v': 3, } # Software name and version self.loaded_metadata['format'] = 'OpenPIV' self.loaded_metadata['version'] = 'n/a' # Sub-window width in pixels binning_x = int(np.min(np.abs(np.diff(data[:, col['x']])))) binning_y = int(np.max(np.abs(np.diff(data[:, col['y']])))) assert binning_x == binning_y binning = binning_x self.loaded_metadata['binning'] = binning # shape of map (from header) shape = data[:, [col['y'], col['x']]].max(axis=0) + binning / 2 assert np.allclose(shape % binning, 0.) shape = tuple((shape / binning).astype(int).tolist()) self.loaded_metadata['shape'] = shape self.checkMetadata() data = data.reshape(shape + (-1,))[::-1].transpose((2, 0, 1)) self.loaded_data.coordinate = data[[col['x'], col['y']]] self.loaded_data.displacement = data[[col['u'], col['v']]] self.check_data()
[docs]def read_until_string( file: TextIO, term_string: str, comment_char: str = '*', line_process: Optional[Callable[[str], Any]] = None, exact: bool = False ) -> List[Any]: """Read lines in a file until a line starting with the `termString` is encountered. The file position is returned before the line starting with the `termString` when found. Comment and empty lines are ignored. Parameters ---------- file An open python text file object. term_string String to terminate reading. comment_char Character at start of a comment line to ignore. line_process Function to apply to each line when loaded. exact A line must exactly match `termString` to stop. Returns ------- list List of processed lines loaded from file. """ lines = [] while True: curr_pos = file.tell() # save position in file line = file.readline() if (not line or (exact and line.strip() == term_string) or (not exact and line.strip().startswith(term_string))): file.seek(curr_pos) # return to before prev line break if line.strip() == '' or line.strip()[0] == comment_char: # Skip comment or empty line continue if line_process is not None: line = line_process(line) lines.append(line) return lines