Source code for xml_to_hdf5

""" When very large experimental files are generated from the biaxial testing machine, it is unable to export as raw
data. To alleviate this issue, the present module can read the raw xml file that the biaxial testing machine uses
internally to store the testing data. That file can be found in the test folder: e.g.
``<TestName>/test_runs/<TestRunName>/Data/daqTaskActivity1.xml``
"""
import sys
import os

import numpy as np
import h5py


[docs]def main(argv): if len(argv) < 2: print("1) A file name (excluding suffix) must be given)") print("2) The outer diameter (in mm) must be given") return file = argv[1] outer_diameter = float(argv[2]) rdata, runits = get_data(file) data, units, dtypes = reduce_data(rdata, runits) comp_data, comp_units, comp_attributes = compensate(data, units, outer_diameter) save_data(file, comp_data, comp_units, dtypes, global_attributes=comp_attributes)
[docs]def save_data(file, data, units, dtypes=None, global_attributes=None): """Save data to hdf5 file for later use :param file: Path to hdf file to save data to, excluding .hdf suffix :type file: str :param data: Data dictionary where each item is a numpy array :type data: dict :param units: Dictionary with same keys as data, describing the units of the data :type units: dict :param dtypes: Dictionary with the data types with which the data should be saved :type dtypes: dict :param global_attributes: Attributes to add the the top level :type global_attributes: dict """ dtypes_ = {} if dtypes is None else dtypes fname = file if file.endswith('.hdf') else file + '.hdf' with h5py.File(fname, 'w') as hdf: for name in data: if name in dtypes_: ds = hdf.create_dataset(name, data=data[name], dtype=dtypes_[name]) else: ds = hdf.create_dataset(name, data=data[name]) ds.attrs["unit"] = units[name] if global_attributes is not None: for key in global_attributes: hdf.attrs[key] = global_attributes[key] return fname
[docs]def join_data(file, subfiles): fname = file if file.endswith('.hdf') else file + '.hdf' with h5py.File(subfiles[0], "r") as first: with h5py.File(fname, 'w') as hdf: for name in first: data = first[name][:] for subfile in subfiles[1:]: with h5py.File(subfile, "r") as sub: if name=="acnt" or name=="tcnt": subv = sub[name][:] data = np.append(data, subv + (data[-1]-subv[0])) else: data = np.append(data, sub[name][:]) ds = hdf.create_dataset(name, data=data, dtype=np.dtype(data[0])) for key in first.attrs: hdf.attrs[key] = first.attrs[key] return fname
[docs]def estimate_lines(file): fname = file if file.endswith('.xml') else file + '.xml' size_per_line = 341.57 # Bytes/line file_size = os.path.getsize(fname) # Bytes estimated_lines = file_size / size_per_line # line return estimated_lines
[docs]def get_data(file, max_lines=None, start_line=0): """ Get data from the biaxial machine's raw xml file. :param file: Path to xml file :type file: str :param max_lines: Max "line number" (i.e. datapoints) to read from xml file. Defaults to None, meaning read until the end :type max_lines: int :param start_line: Which line (time step number) to start at. Useful to get data in parts for large files. :type start_line: int :returns: Dictionary with key based on names in the xml file and content as numpy arrays (float64) Dictionary with key based on names in the xml file and content strings with unit description :rtype: dict, dict """ fname = file if file.endswith('.xml') else file + '.xml' if max_lines is None: estimated_lines = estimate_lines(fname) max_lines = np.inf else: estimated_lines = max_lines with open(fname, 'r') as xml: # Read until signal description list starts for line in xml: if line.strip() == "<Signals>": break # Acquire signal names and units names = [] units = {} for line in xml: if line.strip() == "</Signals>": break name, unit = get_name_and_unit(line) names.append(name) units[name] = unit data = {name: [] for name in names} name_ind = 0 # Not actually needed row_count = 0 if start_line>0: for line in xml: if line.strip() == "<Scan>": row_count += 1 if row_count >= start_line: break for line in xml: if line.strip().startswith("<Value>"): name = names[name_ind] try: data[name].append(get_data_value(line, np.float64)) except ValueError: print("Could not convert a data value, could be due to partial file?") break name_ind += 1 elif line.strip() == "<Scan>": name_ind = 0 row_count += 1 if row_count >= max_lines: break if row_count % 100000 == 0: progress = 100 * (row_count-start_line) / (estimated_lines-start_line) sys.stdout.write("\rEstimated progress: {:5.1f} %".format(progress)) sys.stdout.flush() print('\n') if name_ind != len(names) or name_ind != 0: print("Last data point not completely read, is the file complete?") row_count = np.min([len(data[name]) for name in names]) print("Total number of lines: ", row_count) for name in names: data[name] = np.array(data[name])[:row_count] return data, units
[docs]def get_name_and_unit(line): """ Get the name and unit of the given signal description. An example line is ``<Signal Name="Running Time" InternalName="Running Time" Dimension="time" Unit="sec" />\\n`` :param line: The line in the xml file :type line: str :returns: name, unit :rtype: str, str """ name = line.split("Name=")[-1].split('"')[1] unit = line.split("Unit=")[-1].split('"')[1] return name, unit
[docs]def get_data_value(line, dtype): """ Get the data value from the line, an example line is ``<Value>8.14404296875</Value>\\n`` :param line: The line in the xml file :type line: str :param dtype: The data type to convert to :type dtype: type :returns: The value in the requested data type :rtype: dtype """ value = line.split("<Value>")[-1].split("</Value>")[0] return dtype(value)
[docs]def reduce_data(raw_data, raw_units, save_disp=False, use_tcnt=False, sensor_dtype=np.float32, time_accuracy=1.e-4, cnt_tol=0.1): """ Reduce the amount of data stored by 1) Choose appropriate datatypes for sensor values 2) Automatically determine datatype for time column 3) Not saving displacement/rotation values unless requested 4) Save only the indices when the counter changes 5) Join counters unless requested not to :param raw_data: The raw data read from the xml file :type raw_data: dict :param raw_units: The units corresponding to raw_data :type raw_units: dict :param save_disp: Should displacements and rotations be saved? Defaults to False :type save_disp: bool :param use_tcnt: Should the torsional counter be included as well?, Defaults to False :type use_tcnt: bool :param sensor_dtype: Datatype for sensor values (load,strain and disp/rota) :type sensor_dtype: type :param time_accuracy: The minimum time difference that should be stored accurately :type time_accuracy: float :param cnt_tol: The minimum counter change required to detect new count :type cnt_tol: float :returns: (data, units, dtype) data: New dictionary with keys "forc", "torq", "astr", "tstr", "disp", "rota", "time", ["cnt" or ("acnt", "tcnt")] whichever is available. Note that only the data in the counters are modified, otherwise no conversion from the machine data is performed. units: Unit strings (converted to ascii characters to avoid strange unicode issues) dtype: The datatype with which the data should be stored. :rtype: (dict, dict, dict) """ data = {} units = {} dtypes = {} # Save sensor values (load and strains) sensor_keys = {"forc": "Axial Force", "torq": "Torsional Torque", "astr": "Axial Strain", "tstr": "Torsional Strain (ang)"} for skey in sensor_keys: if sensor_keys[skey] in raw_data: data[skey] = raw_data[sensor_keys[skey]] dtypes[skey] = sensor_dtype units[skey] = str2ascii(raw_units[sensor_keys[skey]]) # Save displacements (if requested) disp_keys = {"disp": "Axial Displacement", "rota": "Torsional Rotation"} if save_disp: for dkey in disp_keys: if disp_keys[dkey] in raw_data: data[dkey] = raw_data[disp_keys[dkey]] dtypes[dkey] = sensor_dtype units[dkey] = str2ascii(raw_units[disp_keys[dkey]]) # Save counter information # Want a vector that returns the index when we change ?cnt by giving 2x the value we change to # 2x because half counters are used as well. cnt_keys = {"acnt": "Axial Segment Count"} if use_tcnt: cnt_keys["tcnt"] = "Torsional Segment Count" for ckey in cnt_keys: raw_cnt = raw_data[cnt_keys[ckey]] c_changes = np.where(raw_cnt[1:] > (raw_cnt[:-1] + cnt_tol))[0] cnt = [0] for c_change in c_changes: change_to = int(2*(raw_cnt[c_change+1]-raw_cnt[0])) while len(cnt) < (change_to-1): cnt.append(cnt[-1]) cnt.append(c_change+1) data[ckey] = np.array(cnt, dtype=np.uint32) # OK up to 4e9 datapoints dtypes[ckey] = np.uint32 units[ckey] = "-" # Save time trkey = "Running Time" tkey = "time" if trkey in raw_data: data[tkey] = raw_data[trkey] units[tkey] = raw_units[trkey] if (data[tkey][-1]/time_accuracy) > 1.e7: dtypes[tkey] = np.float64 else: dtypes[tkey] = np.float32 return data, units, dtypes
[docs]def str2ascii(raw_str): # Convert a string to ascii by removing non-ascii characters return raw_str.encode("ascii", "ignore").decode()
[docs]def compensate(data, units, outer_diameter, tstr_sign=0): """ Compensate data wrt. units, stiffness and cross-talk. :param data: The data dictionary to be compensated (as returned from :py:func:`reduce_data`) :param units: Dictionary of units (as returned from :py:func:`reduce_data`) :param outer_diameter: The outer diameter of the test bar [mm] :param tstr_sign: The sign with which to scale the torsional strain in relation to the torque. -1 if extensometer text upside down, 1 otherwise). If 0 (default), try to automatically detect by correlation with the torque. :returns: The compensated data dictionary, adjusted units, attributes describing compensation :rtype: (dict, dict, dict) """ # The units with which the data will be output fixed_units = {'forc': 'N', 'torq': 'Nmm', 'disp': 'mm', 'rota': 'rad', 'astr': '-', 'tstr': 'rad', 'time': 's'} # Compensation values from Meyer et al. (2018) [https://doi.org/10.1016/j.ijsolstr.2017.10.007] k_axial = 198.71e3 # N/mm Machine axial stiffness, to compensate disp values k_torsional = 25920e3 # Nmm/rad Machine torsional stiffness, to compensate rota values torque_per_force = 0.0901 # Nmm/N Cross talk force to torque, note sign change due to reversed rotation axis. # Other parameters ext_cal_dia = 10.0 # mm The diameter for which the extensometer was calibrated. # Automatically determine sign of shear strain scaling if tstr_sign == 0 and 'tstr' in data: # Remove last 10 % when looking max min to avoid evaluating after failure. n = int(data['tstr'].shape[0]*0.9) # Find max and min torque i_max = np.argmax(data['torq'][:n]) i_min = np.argmin(data['torq'][:n]) # Get sign of inclination for torque versus shear strain. If positive maintain sign versus torque. tsgn = np.sign((data['torq'][i_max] - data['torq'][i_min]) / (data['tstr'][i_max] - data['tstr'][i_min])) else: tsgn = tstr_sign # Scale channels # If KeyError is thrown below, it is suitable to add more units in the following dictionary scale_factors = {'forc': {'N': 1.0, 'kN': 1000.0}, 'torq': {'Nm': -1000.0, 'Nmm': -1.0, 'kNm': -1.0e6, 'kNmm': -1.0e3}, # "-" due to machine 'disp': {'m': 1000.0, 'mm': 1.0}, 'rota': {'rad': -1.0}, # "-" due to machine 'astr': {length_measure + '/' + length_measure: 1.0 for length_measure in ['mm', 'm', 'inch']}, # e.g. m/m 'tstr': {'rad': -tsgn * ext_cal_dia / outer_diameter}, # "-" due to machine 'time': {'sec': 1.0, 's': 1.0}, } for key in scale_factors: if key in data: try: sfac = scale_factors[key][units[key]] except KeyError as ke: if not units[key] in scale_factors[key]: print("Unknown unit {:s} for variable {:s}".format(units[key], key)) print("Available units are: {:s}".format(",".join(['"' + u + '"' for u in scale_factors[key]]))) print("Consider adding additional unit conversions to 'scale_factors' (see above in the code)") raise ke data[key] = sfac * data[key] # Compensate for machine stiffness if 'disp' in data: data['disp'] -= data['forc']/k_axial if 'rota' in data: data['rota'] -= data['torq']/k_torsional stiffness_comp = '' if any([c in data for c in ['disp', 'rota']]): stiffness_comp = 'Machine stiffness compensation:\n' stiffness_comp += ' disp = disp - forc * ({:0.6e} mm/N)\n'.format(1/k_axial) if 'disp' in data else '' stiffness_comp += ' rota = rota - torq * ({:0.6e} rad/Nmm)\n'.format(1/k_torsional) if 'rota' in data else '' # Compensate for cross talk data['torq'] -= data['forc']*torque_per_force # Write modified units new_units = {key: (fixed_units[key] if key in fixed_units else units[key]) for key in data} info = ('' + 'Rotation (torq,rota) reversed from machine\n' + '' if (tstr_sign == 0 or 'tstr' not in data) else ('tstr_sign = ' + str(tsgn) + '\n') + stiffness_comp # Only add if disp and rota part of data + 'Cross talk compensation:\n' + ' torq = torq - forc * ({:0.4f} Nmm/N)\n'.format(torque_per_force) + 'Reference: https://doi.org/10.1016/j.ijsolstr.2017.10.007\n') attributes = {'info': info, 'cross_talk_torque_per_force': torque_per_force, 'axial_stiffness': k_axial, 'torsional_stiffness': k_torsional, 'tstr_sign': tsgn, } return data, new_units, attributes
if __name__ == '__main__': main(sys.argv)