"""
=============
StoG
=============
This module defines the StoG class
that tries to replicate the previous
stog program behavior in an organized fashion
with the ability to re-construct the workflow.
"""
import json
import numpy as np
import pandas as pd
from pystog.utils import create_domain, RealSpaceChoices, ReciprocalSpaceChoices
from pystog.converter import Converter
from pystog.transformer import Transformer
from pystog.fourier_filter import FourierFilter
# Required for non-display environment (i.e. Travis-CI)
import os
import matplotlib as mpl
if os.environ.get('DISPLAY', '') == '':
print('no display found. Using non-interactive Agg backend')
mpl.use('Agg')
import matplotlib.pyplot as plt # noqa: E402
[docs]class StoG(object):
"""The StoG class is used to put together
the Converter, Transformer, and FourierFilter
class functionalities to reproduce the original
**stog** Fortran program behavior. This class is meant to
put together the functionality of the classes
into higher-level calls to construct workflows
for merging and processing multiple recprical space
functions into a final output real space function.
This pythonized-version of the original Fortran **stog**
uses pandas and numpy for data storage, organization, and
manipulation and matplotlib for diagonostic visualization
"under the hood".
:examples:
>>> import json
>>> from pystog import StoG
>>> with open("../data/examples/argon_pystog.json", 'r') as f:
>>> kwargs = json.load(f)
>>> stog = StoG(**kwargs)
>>> stog.read_all_data()
>>> stog.merge_data()
>>> stog.write_out_merged_data()
"""
def __init__(self, **kwargs):
# General attributes
self.__xdecimals = 2
self.__ydecimals = 16
self.__xmin = 100
self.__xmax = 0
self.__qmin = None
self.__qmax = None
self.__files = None
self.__real_space_function = "g(r)"
self.__rmin = 0.0
self.__rmax = 50.0
self.__rdelta = 0.01
self.__update_dr()
self.__density = 1.0
self.__bcoh_sqrd = 1.0
self.__btot_sqrd = 1.0
self.__low_q_correction = False
self.__lorch_flag = False
self.__fourier_filter_cutoff = None
self.__plot_flag = True
self.__plotting_kwargs = {'figsize': (16, 8),
'style': '-',
'ms': 1,
'lw': 1,
}
self.__merged_opts = {"Y": {"Offset": 0.0, "Scale": 1.0}}
self.__stem_name = "out"
# DataFrames for total scattering functions
self.__df_individuals = pd.DataFrame()
self.__df_sq_individuals = pd.DataFrame()
self.__df_sq_master = pd.DataFrame()
self.__df_gr_master = pd.DataFrame()
# Attributes that do not (currently) change
self.__sq_title = "S(Q) Merged"
self.__qsq_minus_one_title = "Q[S(Q)-1] Merged"
self._ft_title = "FT term"
self.__sq_ft_title = "S(Q) FT"
self.__fq_title = "F(Q) Merged"
self.__dr_ft_title = "D(r) FT"
self.__GKofR_title = "G(r) (Keen Version)"
# Set real space title attributes
self.__gr_title = "%s Merged" % self.__real_space_function
self.__gr_ft_title = "%s FT" % self.__real_space_function
self.__gr_lorch_title = "%s FT Lorched" % self.__real_space_function
# Interior class attributes
#: The **converter** attribute defines the Converter
#: which is used to do all inner conversions necessary
#: to go from the input reciprocal space functions,
#: produce diagnostics for the selected real space functions,
#: and finally output the desired real space function
#: Must of type :class:`pystog.converter.Converter`
self.converter = Converter()
#: The **transformer** attribute defines the Transformer
#: which is used to do all Fourier transforms necessary
#: from reciprocal space to real space and vice versa.
#: Must of type :class:`pystog.transformer. Transformer`
self.transformer = Transformer()
#: The **filter** attribute defines the FourierFilter
#: which is used to do all Fourier filtering if the
#: **fourier_filter_cutoff** attribute is supplied.
#: Must of type :class:`pystog.fourier_filter.FourierFilter`
self.filter = FourierFilter()
# Use key-word arguments to set attributes
self.__kwargs2attr(kwargs)
def __kwargs2attr(self, kwargs):
"""Takes the key-word arguments supplied to the
initialization of the class and maps them to
attirbutes in the class. Commonly get the **kwargs**
from the JSON input file.
"""
if "Files" in kwargs:
self.files = kwargs["Files"]
if "RealSpaceFunction" in kwargs:
self.real_space_function = str(kwargs["RealSpaceFunction"])
if "Rmin" in kwargs:
self.rmin = float(kwargs["Rmin"])
if "Rmax" in kwargs:
self.rmax = float(kwargs["Rmax"])
if "Rdelta" in kwargs:
self.rdelta = kwargs["Rdelta"]
elif "Rpoints" in kwargs:
self.rdelta = self.rmax / kwargs["Rpoints"]
if "NumberDensity" in kwargs:
self.density = kwargs["NumberDensity"]
if 'OmittedXrangeCorrection' in kwargs:
self.low_q_correction = kwargs['OmittedXrangeCorrection']
if "LorchFlag" in kwargs:
self.lorch_flag = kwargs["LorchFlag"]
if "FourierFilter" in kwargs:
if "Cutoff" in kwargs["FourierFilter"]:
self.fourier_filter_cutoff = kwargs["FourierFilter"]["Cutoff"]
if "PlotFlag" in kwargs:
self.plot_flag = kwargs["PlotFlag"]
if "<b_coh>^2" in kwargs:
self.bcoh_sqrd = kwargs["<b_coh>^2"]
if "<b_tot^2>" in kwargs:
self.btot_sqrd = kwargs["<b_tot^2>"]
if "Merging" in kwargs:
self.merged_opts = kwargs["Merging"]
if "Transform" in kwargs['Merging']:
transform_opts = kwargs['Merging']["Transform"]
if "Qmin" in transform_opts:
self.qmin = transform_opts["Qmin"]
if "Qmax" in transform_opts:
self.qmax = transform_opts["Qmax"]
if "Outputs" in kwargs:
if "StemName" in kwargs["Outputs"]:
self.stem_name = kwargs["Outputs"]["StemName"]
# General attributes
@property
def xmin(self):
"""The minimum X value of all datasets to
use for Fourier transforms (from recirocal space -> real space)
:getter: Returns the current set value
:setter: Set the xmin value for the Fourier transforms
:type: float
"""
return self.__xmin
@xmin.setter
def xmin(self, value):
self.__xmin = value
@property
def xmax(self):
"""The maximum X value of all datasets to
use for Fourier transforms (from recirocal space -> real space)
:getter: Returns the current set value
:setter: Set the xmax value for the Fourier transforms
:type: float
"""
return self.__xmax
@xmax.setter
def xmax(self, value):
self.__xmax = value
@property
def qmin(self):
"""The :math:`Q_{min}` value to use for the Fourier
transforms (from recirocal space -> real space). This
overrides **xmin** attribute if **xmin** < **qmin**.
:getter: Returns the current set value
:setter: Set the :math:`Q_{min}` value for the Fourier transforms
:type: float
"""
return self.__qmin
@qmin.setter
def qmin(self, value):
self.__qmin = value
@property
def qmax(self):
"""The :math:`Q_{max}` value to use for the Fourier
transforms (from recirocal space -> real space). This
overrides **xmax** attribute if **qmax** < **xmax**.
:getter: Returns the current set value
:setter: Set the qmax value for the Fourier transforms
:type: float
"""
return self.__qmax
@qmax.setter
def qmax(self, value):
self.__qmax = value
@property
def files(self):
"""The files that contain the reciprocal space data
to merge together.
:getter: Current list of files to merge
:setter: Set the list of files to merg
:type: list
"""
return self.__files
@files.setter
def files(self, file_list):
self.__files = file_list
[docs] def append_file(self, new_file):
"""Appends a file to the file list
:param new_file: New file name to append
:type new_file: str
:return: File list with appended new_file
:rtype: list
"""
self.files = self.files + [new_file]
return self.files
[docs] def extend_file_list(self, new_files):
"""Extend the file list with a list of new files
:param new_files: List of new files
:type new_file: list
:return: File list extended by new_files
:rtype: list
"""
self.files.extend(new_files)
return self.files
def __update_dr(self):
"""Uses **rdelta** and **rmax** attributes (:math:`\\Delta r` and
:math:`R_{max}`, respectively) to construct **dr** attribute
(:math:`r`-space vector) via its setter
"""
self.dr = create_domain(self.rmin, self.rmax, self.rdelta)
@property
def rdelta(self):
"""The :math:`\\Delta r` for the :math:`r`-space vector
:getter: Return :math:`\\Delta r` value
:setter: Set the :math:`\\Delta r` value
and update :math:`r`-space vector
via the **dr** attribute
:type: value
"""
return self.__rdelta
@rdelta.setter
def rdelta(self, value):
self.__rdelta = value
self.__update_dr()
@property
def rmin(self):
"""The :math:`R_{min}` valuefor the :math:`r`-space vector
:getter: Return :math:`R_{min}` value
:setter: Set the :math:`R_{min}` value and update :math:`r`-space vector
via the **dr** attribute
:type: value
"""
return self.__rmin
@rmin.setter
def rmin(self, value):
self.__rmin = value
self.__update_dr()
@property
def rmax(self):
"""The :math:`R_{max}` valuefor the :math:`r`-space vector
:getter: Return :math:`R_{max}` value
:setter: Set the :math:`R_{max}` value and update :math:`r`-space vector
via the **dr** attribute
:type: value
"""
return self.__rmax
@rmax.setter
def rmax(self, value):
self.__rmax = value
self.__update_dr()
@property
def dr(self):
"""The real space function X axis data, :math:`r`-space vector
:getter: Return the :math:`r` vector
:setter: Set the :math:`r` vector
:type: numpy.array
"""
return self.__dr
@dr.setter
def dr(self, r):
self.__dr = r
@property
def density(self):
"""The number density used (atoms/:math:`\\AA^{3}`) used for the
:math:`\\rho_{0}` term in the equations
:getter: Return the density value
:setter: Set the density value
:type: float
"""
return self.__density
@density.setter
def density(self, value):
self.__density = value
@property
def bcoh_sqrd(self):
"""The average coherent scattering length, squared:
:math:`\\langle b_{coh} \\rangle^2` =
:math:`( \\sum_{i} c_{i} b_{coh,i} ) ( \\sum_{i} c_{i} b^*_{coh,i} )`
where the subscript :math:`i` implies for atom type :math:`i`,
:math:`c_{i}` is the concentration of :math:`i`,
:math:`b_{coh,i}` is the coherent scattering length of :math:`i`,
and :math:`b^*_{coh,i}`
is the complex coherent scattering length of :math:`i`
The real part of the :math:`b_{coh,i}` term can be found
from the **Coh b** column of the NIST neutron scattering
length and cross section table found here:
https://www.ncnr.nist.gov/resources/n-lengths/list.html
Units are in :math:`fm` in the table for the :math:`b_{coh,i}` term.
Thus, :math:`\\langle b_{coh} \\rangle^2` has units of :math:`fm^2`
(and what PyStoG expects). NOTE: 100 :math:`fm^2` == 1 :math:`barn`.
:getter: Return the value of :math:`\\langle b_{coh} \\rangle^2`
:setter: Set the value for :math:`\\langle b_{coh} \\rangle^2`
:type: float
"""
# TODO: Add an example with code to demostrate
return self.__bcoh_sqrd
@bcoh_sqrd.setter
def bcoh_sqrd(self, value):
self.__bcoh_sqrd = value
@property
def btot_sqrd(self):
"""The average coherent scattering length, squared:
:math:`\\langle b_{tot}^2 \\rangle`
= :math:`\\sum_{i} c_{i} b_{tot,i}^2`
= :math:`\\frac{1}{4 \\pi} \\sum_i c_{i} \\sigma_{tot,i}`
where the subscript :math:`i` implies for atom type :math:`i`,
:math:`c_{i}` is the concentration of :math:`i` and
:math:`\\sigma_{tot,i}` is the total cross-section of :math:`i`
The real part of the :math:`b_{coh,i}` term can be found
from the **Scatt xs** column of the NIST neutron scattering
length and cross section table found here:
https://www.ncnr.nist.gov/resources/n-lengths/list.html
Units are in :math:`barn` (=100 :math:`fm^2`) in the table
for the :math:`\\sigma_{tot,i}` term. Thus, you must multiply
:math:`\\sum_{i} c_{i} b_{tot,i}^2` by 100 to go from :math:`barn`
to :math:`fm^2` (what PyStoG expects).
:getter: Return the value of :math:`\\sum_{i} c_{i} b_{tot,i}^2`
:setter: Set the value for :math:`\\sum_{i} c_{i} b_{tot,i}^2`
:type: float
"""
# TODO: Add an example with code to demostrate
return self.__btot_sqrd
@btot_sqrd.setter
def btot_sqrd(self, value):
self.__btot_sqrd = value
@property
def stem_name(self):
"""A stem name to prefix for all output files. Replicates
the **stog** Fortran program behavior.
:getter: Return the currently set stem name
:setter: Set the stem name for output files
:type: str
"""
return self.__stem_name
@stem_name.setter
def stem_name(self, name):
self.__stem_name = name
@property
def low_q_correction(self):
"""This sets the option to perform a low-:math:`Q` correction
for the omitted :math:`Q` range.
See :class:`pystog.transformer.Transformer` **_low_x_correction**
method for more information.
:getter: Return bool of applying the low-:math:`Q` correction
:setter: Set whether the correction is applied or not
:type: bool
"""
return self.__low_q_correction
@low_q_correction.setter
def low_q_correction(self, value):
if isinstance(value, bool):
self.__low_q_correction = value
else:
raise TypeError("Expected a bool, True or False")
@property
def lorch_flag(self):
"""This sets the option to perform the Lorch dampening correction
for the :math:`Q` range. Generally, will help reduce Fourier "ripples",
or AKA "Gibbs phenomenon", due to discontinuity at :math:`Q_{max}`
if the reciprocal space function is not at the
:math:`Q -> \\infty` limit.
Yet, will also broaden real space function peaks, possibly dubiously.
See :class:`pystog.transformer.Transformer` **fourier_transform** and
**_low_x_correction** methods for where this is applied.
:getter: Return bool of applying the Lorch dampening correction
:setter: Set whether the correction is applied or not
:type: bool
"""
return self.__lorch_flag
@lorch_flag.setter
def lorch_flag(self, value):
if isinstance(value, bool):
self.__lorch_flag = value
else:
raise TypeError("Expected a bool, True or False")
@property
def fourier_filter_cutoff(self):
"""This sets the cutoff in :math:`r`-space for the Fourier
filter. The minimum is automatically 0.0. Thus, from
0.0 to **fourier_filter_cutoff** is reverse transfomed,
subtracted in reciprocal space, and then the difference
is back-transformed.
See :class:`pystog.fourier_filter.FourierFilter` for more information.
:getter: Return currently set cutoff value
:setter: Set cutoff value
:type: float
"""
return self.__fourier_filter_cutoff
@fourier_filter_cutoff.setter
def fourier_filter_cutoff(self, value):
self.__fourier_filter_cutoff = value
@property
def merged_opts(self):
"""This sets the options to perform after merging the
reciprocal space functions together, such as an
overall offset and scale.
:getter: Return the options currently set
:setter: Set the options for the merged pattern
:type: dict
"""
return self.__merged_opts
@merged_opts.setter
def merged_opts(self, options):
self.__merged_opts = options
# DataFrame attributes
@property
def df_individuals(self):
"""The DataFrame for the input reciprocal space functions
loaded from **files** and with the loading processing
from **add_dataset** class method.
:getter: Returns the current individual, input
reciprocal space functions DataFrame
:setter: Sets the DataFrame
:type: pandas.DataFrame
"""
return self.__df_individuals
@df_individuals.setter
def df_individuals(self, df):
self.__df_individuals = df
@property
def df_sq_individuals(self):
"""The DataFrame for the :math:`S(Q)` generated from each input
reciprocal space dataset in **df_individuals** class DataFrame.
:getter: Returns the current individual :math:`S(Q)`
reciprocal space functions DataFrame
:setter: Sets the DataFrame
:type: pandas.DataFrame
"""
return self.__df_sq_individuals
@df_sq_individuals.setter
def df_sq_individuals(self, df):
self.__df_sq_individuals = df
@property
def df_sq_master(self):
"""The "master" DataFrame for the :math:`S(Q)` reciprocal
space functions that are generated for each processing step.
"""
return self.__df_sq_master
@df_sq_master.setter
def df_sq_master(self, df):
self.__df_sq_master = df
@property
def df_gr_master(self):
"""The "master" DataFrame for the real space functions
that are generated for each processing step.
:getter: Returns the current "master" real space function DataFrame
:setter: Sets the "master" real space function DataFrame
:type: pandas.DataFrame
"""
return self.__df_gr_master
@df_gr_master.setter
def df_gr_master(self, df):
self.__df_gr_master = df
# Visualization attributes
@property
def plot_flag(self):
"""This sets the option for matplotlib to display
diagnostic plots or not.
:getter: Return bool of flag
:setter: Set whether the diagnostics are displayed or not
:type: bool
"""
return self.__plot_flag
@plot_flag.setter
def plot_flag(self, value):
if isinstance(value, bool):
self.__plot_flag = value
else:
raise TypeError("Expected a bool, True or False")
@property
def plotting_kwargs(self):
"""The plot settings for visualization via matplotlib
:getter: Returns the current arguments
:setter: Sets the plotting kwargs
:type: dict
"""
return self.__plotting_kwargs
@plotting_kwargs.setter
def plotting_kwargs(self, kwargs):
self.__plotting_kwargs = kwargs
@property
def real_space_function(self):
"""The real space function to use throughoutt the processing
:getter: Returns the currently select real space function
:setter: Set the selected real space function and
updates other title attributes that rely on this
in their name.
:type: str
"""
return self.__real_space_function
@real_space_function.setter
def real_space_function(self, real_space_function):
if real_space_function not in RealSpaceChoices:
raise ValueError("real_space_function must be of %s" %
','.join(RealSpaceChoices.keys()))
self.__real_space_function = real_space_function
self.__gr_ft_title = "%s FT" % real_space_function
self.__gr_lorch_title = "%s FT Lorched" % real_space_function
self.__gr_title = "%s Merged" % real_space_function
@property
def sq_title(self):
"""The title of the :math:`S(Q)` function directly after merging
the reciprocal space functions without any further corrections.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__sq_title
@sq_title.setter
def sq_title(self, title):
self.__sq_title = title
@property
def qsq_minus_one_title(self):
"""The title of the :math:`Q[S(Q)-1]` function
directly after merging the reciprocal space
functions without any further corrections.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__qsq_minus_one_title
@qsq_minus_one_title.setter
def qsq_minus_one_title(self, title):
self.__qsq_minus_one_title = title
@property
def sq_ft_title(self):
"""The title of the :math:`S(Q)` function after
merging and a fourier filter correction.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__sq_ft_title
@sq_ft_title.setter
def sq_ft_title(self, title):
self.__sq_ft_title = title
@property
def fq_title(self):
"""The title of the :math:`F(Q)` function after
merging and a fourier filter correction.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__fq_title
@fq_title.setter
def fq_title(self, title):
self.__fq_title = title
@property
def gr_title(self):
"""The title of the real space function directly after merging
the reciprocal space functions without any further corrections.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__gr_title
@gr_title.setter
def gr_title(self, title):
self.__gr_title = title
@property
def gr_ft_title(self):
"""The title for the real space function after both
merging and a fourier filter correction
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__gr_ft_title
@gr_ft_title.setter
def gr_ft_title(self, title):
self.__gr_ft_title = title
@property
def gr_lorch_title(self):
"""The title for the real space function with the lorch correction
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__gr_lorch_title
@gr_lorch_title.setter
def gr_lorch_title(self, title):
self.__gr_lorch_title = title
@property
def GKofR_title(self):
"""The title of the :math:`G_{Keen Version}(r)` with
all corrections applied.
:getter: Returns the current title for this function
:setter: Sets the title for this function
:type: str
"""
return self.__GKofR_title
@GKofR_title.setter
def GKofR_title(self, title):
self.__GKofR_title = title
# -------------------------------------#
# Reading and Merging Spectrum
[docs] def read_all_data(self, **kwargs):
"""Reads all the data from the **files** attribute
Uses the **read_dataset** method on each file.
Will append all datasets as DataFrames to the
**df_inviduals** attribute DataFrame
and also convert to :math:`S(Q)` and add to the **df_sq_individuals**
attribute DataFrame in **add_dataset** method
via **read_dataset** method.
"""
assert self.files is not None
assert len(self.files) != 0
for i, file_info in enumerate(self.files):
file_info['index'] = i
self.read_dataset(file_info, **kwargs)
[docs] def read_dataset(
self,
info,
xcol=0,
ycol=1,
sep=r"\s+",
skiprows=2,
**kwargs):
"""Reads an individual file and uses the **add_dataset**
method to apply all dataset manipulations, such as
scales, offsets, cropping, etc.
Will append the DataFrame to the **df_inviduals** attribute DataFrame
and also convert to :math:`S(Q)` and add to the **df_sq_individuals**
attribute DataFrame in **add_dataset** method.
:param info: Dict with information for dataset
(filename, manipulations, etc.)
:type info: dict
:param xcol: The column in the data file that contains the X-axis
:type xcol: int
:param ycol: The column in the data file that contains the Y-axis
:type ycol: int
:param sep: Separator for the file used by pandas.read_csv
:type sep: raw string
:param skiprows: Number of rows to skip. Passed to pandas.read_csv
:type skiprows: int
"""
# TODO: Create a proper parser class so we can be
# more accepting of file formats.
data = pd.read_csv(info['Filename'],
sep=sep,
usecols=[xcol, ycol],
names=['x', 'y'],
skiprows=skiprows,
engine='python',
**kwargs)
info['data'] = data
self.add_dataset(info, index=info['index'], **kwargs)
[docs] def add_dataset(
self,
info,
index=0,
yscale=1.,
yoffset=0.,
xoffset=0.,
ydecimals=16,
**kwargs):
"""Takes the info with the dataset and manipulations,
such as scales, offsets, cropping, etc., and creates
an invidual DataFrame.
Will append the DataFrame to the **df_inviduals** attribute DataFrame
and also convert to :math:`S(Q)` and add to the **df_sq_individuals**
attribute DataFrame.
:param info: Dict with information for dataset
(filename, manipulations, etc.)
:type info: dict
:param index: Index of the added reciprocal space function dataset
:type index: int
:param yscale: Scale factor for the Y data
(i.e. :math:`S(Q)`, :math:`F(Q)`, etc.)
:type yscale: float
:param yoffset: Offset factor for the Y data
(i.e. :math:`S(Q)`, :math:`F(Q)`, etc.)
:type yoffset: float
:param xoffset: Offset factor for the X data (i.e. :math:`Q`)
:type yoffset: float
"""
# Extract data
x = np.around(np.array(info['data']['x']), decimals=self.__xdecimals)
y = np.around(np.array(info['data']['y']), decimals=self.__ydecimals)
# Cropping
xmin = min(x)
xmax = max(x)
if 'Qmin' in info:
xmin = info['Qmin']
if 'Qmax' in info:
xmax = info['Qmax']
x, y, _ = self.transformer.apply_cropping(x, y, xmin, xmax)
# Offset and scale
adjusting = False
if 'Y' in info:
adjusting = True
if 'Scale' in info['Y']:
yscale = info['Y']['Scale']
if 'Offset' in info['Y']:
yoffset = info['Y']['Offset']
if 'X' in info:
adjusting = True
if 'Offset' in info['X']:
xoffset = info['X']['Offset']
if adjusting:
x, y = self._apply_scales_and_offset(
x, y, yscale, yoffset, xoffset)
# Save overal x-axis min and max
self.xmin = min(self.xmin, xmin)
self.xmax = max(self.xmax, xmax)
# Use Qmin and Qmax to crop datasets
if self.qmin is not None:
if self.xmin < self.qmin:
x, y, _ = self.transformer.apply_cropping(
x, y, self.qmin, self.xmax)
if self.qmax is not None:
if self.xmax > self.qmax:
x, y, _ = self.transformer.apply_cropping(
x, y, self.xmin, self.qmax)
# Default to S(Q) if function type not defined
if "ReciprocalFunction" not in info:
info["ReciprocalFunction"] = "S(Q)"
if info["ReciprocalFunction"] not in ReciprocalSpaceChoices:
error = "ReciprocalFunction was equal to {given}.\n"
error += "ReciprocalFunction must be one of the folloing {options}"
error = error.format(
given=info["ReciprocalFunction"],
options=json.dumps(ReciprocalSpaceChoices))
raise ValueError(error)
# Save reciprocal space function to the "invididuals" DataFrame
df = pd.DataFrame(
y, columns=[
'%s_%d' %
(info['ReciprocalFunction'], index)], index=x)
self.df_individuals = pd.concat([self.df_individuals, df], axis=1)
# Convert to S(Q) and save to the individual S(Q) DataFrame
if info["ReciprocalFunction"] == "Q[S(Q)-1]":
y, dy = self.converter.F_to_S(x, y)
elif info["ReciprocalFunction"] == "FK(Q)":
y, dy = self.converter.FK_to_S(
x, y, **{'<b_coh>^2': self.bcoh_sqrd})
elif info["ReciprocalFunction"] == "DCS(Q)":
y, dy = self.converter.DCS_to_S(x, y,
**{'<b_coh>^2': self.bcoh_sqrd,
'<b_tot^2>': self.btot_sqrd})
df = pd.DataFrame(y, columns=['S(Q)_%d' % index], index=x)
self.df_sq_individuals = pd.concat(
[self.df_sq_individuals, df], axis=1)
def _apply_scales_and_offset(
self,
x,
y,
yscale=1.0,
yoffset=0.0,
xoffset=0.0):
"""Applies scales to the Y-axis and offsets to both X and Y axes.
:param x: X-axis data
:type x: numpy.array or list
:param y: Y-axis data
:type y: numpy.array or list
:param yscale: Y-axis scale factor
:type yscale: float
:param yoffset: Y-axis offset factor
:type yoffset: float
:param xoffset: X-axis offset factor
:type xoffset: float
:return: X and Y vectors after scales and offsets applied
:rtype: numpy.array pair
"""
y = self._scale(y, yscale)
y = self._offset(y, yoffset)
x = self._offset(x, xoffset)
return x, y
def _offset(self, data, offset):
"""Applies offset to data
:param data: Input data
:type data: numpy.array or list
:param offset: Offset to apply to data
:type offset: float
:return: Data with offset applied
:rtype: numpy.array
"""
data = data + offset
return data
def _scale(self, data, scale):
"""Applies scale to data
:param data: Input data
:type data: numpy.array or list
:param offset: Scale to apply to data
:type offset: float
:return: Data with scale applied
:rtype: numpy.array
"""
data = scale * data
return data
[docs] def merge_data(self):
"""Merges the reciprocal space data stored in the
**df_individuals** class DataFrame into a single, merged
recirocal space function. Stores the S(Q) result in
**df_sq_master** class DataFrame.
Also, converts this
merged :math:`S(Q)` into :math:`Q[S(Q)-1]` via the
**Converter** class and applies any modification
specified in **merged_opts** dict attribute, specified
by the **'Q[S(Q)-1]'** key of the dict. If there is modification,
this modified :math:`Q[S(Q)-1]` will be converted to
:math:`S(Q)` and replace the :math:`S(Q)` directly after merge.
Example dict of **merged_opts** for scaling of
:math:`S(Q)` by 2 and then offsetting :math:`Q[S(Q)-1]` by 5:
.. highlight:: python
.. code-block:: python
{"Merging": { "Y": { "Offset": 0.0,
"Scale": 2.0 },
"Q[S(Q)-1]": { "Y": "Offset": 5.0,
"Scale": 1.0 }
}
...
"""
# TODO: Refator to have "S(Q)" as key for S(Q) modifications
# Sum over single S(Q) columns into a merged S(Q)
single_sofqs = self.df_sq_individuals.iloc[:, :]
self.df_sq_master[self.sq_title] = single_sofqs.mean(axis=1)
q = self.df_sq_master[self.sq_title].index.values
sq = self.df_sq_master[self.sq_title].values
q, sq = self._apply_scales_and_offset(q, sq,
self.merged_opts['Y']['Scale'],
self.merged_opts['Y']['Offset'],
0.0)
self.df_sq_master[self.sq_title] = sq
# Also, create merged Q[S(Q)-1] with modifications, if specified
fofq, dfofq = self.converter.S_to_F(q, sq)
if "Q[S(Q)-1]" in self.merged_opts:
fofq_opts = self.merged_opts["Q[S(Q)-1]"]
if "Y" in fofq_opts:
if "Scale" in fofq_opts["Y"]:
fofq *= fofq_opts["Y"]["Scale"]
if "Offset" in fofq_opts["Y"]:
fofq += fofq_opts["Y"]["Offset"]
self.df_sq_master[self.qsq_minus_one_title] = fofq
# Convert this Q[S(Q)-1] back to S(Q) and overwrite the 1st one
sq, dsq = self.converter.F_to_S(q, fofq)
sq[np.isnan(sq)] = 0
self.df_sq_master[self.sq_title] = sq
# -------------------------------------#
# Transform Utilities
[docs] def fourier_filter(self):
"""Performs the Fourier filter on the **df_sq_master**
DataFrame to generate the desired real space function
with this correction. The results from both reciprocal space and
real space are:
1. Saved back to the respective "master" DataFrames
2. Saved to files via the **stem_name**
3. (optional) Plotted for diagnostics
4. Returned from function
:return: Returns a tuple with :math:`r`,
the selected real space function,
:math:`Q`,
and :math:`S(Q)` functions
:rtype: tuple of numpy.array
"""
kwargs = {'lorch': False,
'rho': self.density,
'<b_coh>^2': self.bcoh_sqrd,
'OmittedXrangeCorrection': self.low_q_correction
}
cutoff = self.fourier_filter_cutoff
# Get reciprocal and real space data
if self.gr_title not in self.df_gr_master.columns:
msg = (
"WARNING: Fourier filtered before initial transform. "
"Peforming now...")
print(msg)
self.transform_merged()
r = self.df_gr_master[self.gr_title].index.values
gr = self.df_gr_master[self.gr_title].values
q = self.df_sq_master[self.sq_title].index.values
sq = self.df_sq_master[self.sq_title].values
# Fourier filter g(r)
# NOTE: Real space function setter will catch ValueError so
# so no need for `else` to catch error
if self.real_space_function == "g(r)":
q_ft, sq_ft, q, sq, r, gr, _, _, _ = self.filter.g_using_S(
r, gr, q, sq, cutoff, **kwargs)
elif self.real_space_function == "G(r)":
q_ft, sq_ft, q, sq, r, gr, _, _, _ = self.filter.G_using_S(
r, gr, q, sq, cutoff, **kwargs)
elif self.real_space_function == "GK(r)":
q_ft, sq_ft, q, sq, r, gr, _, _, _ = self.filter.GK_using_S(
r, gr, q, sq, cutoff, **kwargs)
# Round to avoid mismatch index in DataFrame and NaN for column values
q = np.around(q, decimals=self.__xdecimals)
sq = np.around(sq, decimals=self.__ydecimals)
q_ft = np.around(q_ft, decimals=self.__xdecimals)
sq_ft = np.around(sq_ft, decimals=self.__ydecimals)
# Add output to master dataframes and write files
self.df_sq_master = self.add_to_dataframe(
q_ft, sq_ft, self.df_sq_master, self._ft_title)
self.write_out_ft()
self.df_sq_master = self.add_to_dataframe(
q, sq, self.df_sq_master, self.sq_ft_title)
self.write_out_ft_sq()
self.df_gr_master = self.add_to_dataframe(
r, gr, self.df_gr_master, self.gr_ft_title)
self.write_out_ft_gr()
# Plot results
if self.plot_flag:
exclude_list = [self.qsq_minus_one_title, self.sq_ft_title]
self.plot_sq(
ylabel="FourierFilter(Q)",
title="Fourier Transform of the low-r region below cutoff",
exclude_list=exclude_list)
exclude_list = [self.qsq_minus_one_title]
self.plot_sq(
title="Fourier Filtered S(Q)",
exclude_list=exclude_list)
self.plot_gr(
title="Fourier Filtered %s" %
self.real_space_function)
return q, sq, r, gr
[docs] def apply_lorch(self, q, sq, r):
"""Performs the Fourier transform using the Lorch
dampening correction on the merged :math:`S(Q)` from
the **df_sq_master** DataFrame to generate the
desired real space function with
this correction. The results from both reciprocal space and
real space are:
1. Saved back to the respective "master" DataFrames
2. Saved to files via the **stem_name**
3. (optional) Plotted for diagnostics
4. Returned from function
:param q: :math:`Q`-space vector
:type q: numpy.array or list
:param sq: :math:`S(Q)` vector
:type sq: numpy.array or list
:param r: :math:`r`-space vector
:type r: numpy.array or list
:return: Returns a tuple with :math:`r` and selected real space function
:rtype: tuple of numpy.array
"""
if self.real_space_function == "g(r)":
r, gr_lorch, _ = self.transformer.S_to_g(
q, sq, r, **{'lorch': True, 'rho': self.density})
elif self.real_space_function == "G(r)":
r, gr_lorch, _ = self.transformer.S_to_G(
q, sq, r, **{'lorch': True})
elif self.real_space_function == "GK(r)":
r, gr_lorch, _ = self.transformer.S_to_GK(
q, sq, r,
**{
'lorch': True,
'rho': self.density,
'<b_coh>^2': self.bcoh_sqrd
})
self.df_gr_master = self.add_to_dataframe(
r, gr_lorch, self.df_gr_master, self.gr_lorch_title)
self.write_out_lorched_gr()
if self.plot_flag:
self.plot_gr(
title="Lorched %s" %
self.real_space_function)
return r, gr_lorch
def _get_lowR_mean_square(self):
"""Retuns the low-R mean square value for the real space function stored
in the "master" real space function class DataFrame, **df_gr_master**.
Used as a cost function for optimiziation of the :math:`Q_{max}` value
by an iterative adjustment. Calls **_lowR_mean_square* method.
**Currently not used in PyStoG workflow since was done manually.**
:return: The calculated low-R mean-square value
:rtype: float
"""
# TODO: Automate the :math:`Q_{max}` adjustment in an iterative loop
# using a minimizer.
gr = self.df_gr_master[self.gr_title].values
return self._lowR_mean_square(self.dr, gr)
def _lowR_mean_square(self, r, gr, limit=1.01):
"""Calculates the low-R mean square value from a given real space function.
Used as a cost function for optimiziation of the :math:`Q_{max}` value
by an iterative adjustment.
**Currently not used in PyStoG workflow since was done manually.**
:param r: :math:`r`-space vector
:type r: numpy.array or list
:param gr: real space function vector
:type gr: numpy.array or list
:param limit: The upper limit on :math:`r` to use for
the mean-square calculation
:type limit: float
:return: The calculated low-R mean-square value
:rtype: float
"""
# TODO: Automate the :math:`Q_{max}` adjustment in an iterative loop
# using a minimizer.
gr = gr[r <= limit]
gr_sq = np.multiply(gr, gr)
average = sum(gr_sq)
return np.sqrt(average)
def _add_keen_fq(self, q, sq):
"""Adds the Keen version of :math:`F(Q)` to the
"master" recprical space DataFrame, **df_sq_master**, and
writes it out to file using the **stem_name**.
:param q: :math:`Q`-space vector
:type q: numpy.array or list
:param sq: :math:`S(Q)` vector
:type sq: numpy.array or list
"""
kwargs = {'rho': self.density, "<b_coh>^2": self.bcoh_sqrd}
fq, dfq = self.converter.S_to_FK(q, sq, **kwargs)
self.df_sq_master = self.add_to_dataframe(
q, fq, self.df_sq_master, self.fq_title)
self.write_out_rmc_fq()
def _add_keen_gr(self, r, gr):
"""Adds the Keen version of :math:`G(r)` to the
"master" real space DataFrame, **df_gr_master**, and
writes it out to file using the **stem_name**.
:param r: :math:`r`-space vector
:type r: numpy.array or list
:param gr: real space function vector
:type gr: numpy.array or list
"""
kwargs = {'rho': self.density, "<b_coh>^2": self.bcoh_sqrd}
if self.real_space_function == "g(r)":
GKofR, dGKofR = self.converter.g_to_GK(r, gr, **kwargs)
elif self.real_space_function == "G(r)":
GKofR, dGKofR = self.converter.G_to_GK(r, gr, **kwargs)
elif self.real_space_function == "GK(r)":
GKofR = gr
self.df_gr_master = self.add_to_dataframe(
r, GKofR, self.df_gr_master, self.GKofR_title)
self.write_out_rmc_gr()
# -------------------------------------#
# Plot Utilities
def _plot_df(self, df, xlabel, ylabel, title, exclude_list):
"""Utility function to help plot a DataFrame
:param df: DataFrame to plot
:type df: pandas.DataFrame
:param xlabel: X-axis label
:type xlabel: str
:param ylabel: Y-axis label
:type ylabel: str
:param title: Title of plot
:type title: str
:param exclude_list: List of titles of columns in
DataFrame **df** to exclude from plot
:type exclude_list: list of str
"""
if exclude_list:
columns_diff = df.columns.difference(exclude_list)
columns_diff_ids = df.columns.get_indexer(columns_diff)
df = df.iloc[:, columns_diff_ids]
df.plot(**self.plotting_kwargs)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.title(title)
plt.show()
[docs] def plot_sq(self, xlabel='Q', ylabel='S(Q)', title='', exclude_list=None):
"""Helper function to plot the :math:`S(Q)` functions
in the "master" DataFrame, **df_sq_master**.
:param xlabel: X-axis label
:type xlabel: str
:param ylabel: Y-axis label
:type ylabel: str
:param title: Title of plot
:type title: str
:param exclude_list: List of titles of columns in
DataFrame to exclude from plot
:type exclude_list: list of str
"""
df_sq = self.df_sq_master
self._plot_df(df_sq, xlabel, ylabel, title, exclude_list)
[docs] def plot_merged_sq(self):
"""Helper function to multiplot the individual
real space functions in the **df_individuals** DataFrame,
these functions as individual :math:`S(Q)`, the merged
:math:`S(Q)` from the individual functions, and
:math:`Q[S(Q)-1]`.
"""
plot_kwargs = self.plotting_kwargs.copy()
plot_kwargs['style'] = 'o-'
plot_kwargs['lw'] = 0.5
fig, axes = plt.subplots(2, 2, sharex=True)
plt.xlabel("Q")
# Plot the inividual reciprocal functions
if self.df_individuals.empty:
self.df_sq_individuals.plot(ax=axes[0, 0], **plot_kwargs)
else:
self.df_individuals.plot(ax=axes[0, 0], **plot_kwargs)
# Plot the inividual S(Q) functions
self.df_sq_individuals.plot(ax=axes[0, 1], **plot_kwargs)
axes[0, 1].set_ylabel("S(Q)")
axes[0, 1].set_title("Individual S(Q)")
# Plot the merged S(Q)
df_sq = self.df_sq_master.loc[:, [self.sq_title]]
df_sq.plot(ax=axes[1, 0], **plot_kwargs)
axes[1, 0].set_title("Merged S(Q)")
axes[1, 0].set_ylabel("S(Q)")
# Plot the merged Q[S(Q)-1]
df_fq = self.df_sq_master.loc[:, [self.qsq_minus_one_title]]
df_fq.plot(ax=axes[1, 1], **plot_kwargs)
axes[1, 1].set_title("Merged Q[S(Q)-1]")
axes[1, 1].set_ylabel("Q[S(Q)-1]")
plt.show()
[docs] def plot_gr(self, xlabel='r', ylabel='G(r)', title='', exclude_list=None):
"""Helper function to plot the real space functions
in the "master" DataFrame, **df_gr_master**.
:param xlabel: X-axis label
:type xlabel: str
:param ylabel: Y-axis label
:type ylabel: str
:param title: Title of plot
:type title: str
:param exclude_list: List of titles of columns in
DataFrame to exclude from plot
:type exclude_list: list of str
"""
df_gr = self.df_gr_master
self._plot_df(df_gr, xlabel, ylabel, title, exclude_list)
[docs] def plot_summary_sq(self):
"""Helper function to multiplot the reciprocal space
functions during processing and the :math:`F(Q)` function.
"""
if self.fq_title not in self.df_sq_master.columns:
q = self.df_sq_master[self.sq_title].index.values
sq = self.df_sq_master[self.sq_title].values
self._add_keen_fq(q, sq)
fig, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
exclude_list = [self.fq_title]
df = self.df_sq_master
columns_diff = df.columns.difference(exclude_list)
columns_diff_ids = df.columns.get_indexer(columns_diff)
df_sq = self.df_sq_master.iloc[:, columns_diff_ids]
df_sq.plot(ax=ax1, **self.plotting_kwargs)
df_fq = self.df_sq_master.loc[:, [self.fq_title]]
df_fq.plot(ax=ax2, **self.plotting_kwargs)
plt.xlabel("Q")
ax1.set_ylabel("S(Q)")
ax1.set_title("StoG S(Q) functions")
ax2.set_ylabel("FK(Q)")
ax2.set_title("Keen's F(Q)")
plt.show()
[docs] def plot_summary_gr(self):
"""Helper function to multiplot the real space
functions during processing
and the :math:`G_{Keen Version}(Q)` function.
"""
if self.GKofR_title not in self.df_gr_master.columns:
r = self.df_gr_master[self.gr_title].index.values
gr = self.df_gr_master[self.gr_title].values
self._add_keen_gr(r, gr)
fig, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
df = self.df_gr_master
columns_diff = df.columns.difference([self.GKofR_title])
columns_diff_ids = df.columns.get_indexer(columns_diff)
df_gr = df.iloc[:, columns_diff_ids]
df_gr.plot(ax=ax1, **self.plotting_kwargs)
df_gk = df.loc[:, [self.GKofR_title]]
df_gk.plot(ax=ax2, **self.plotting_kwargs)
plt.xlabel("r")
ax1.set_ylabel(self.real_space_function)
ax1.set_title("StoG %s functions" % self.real_space_function)
ax2.set_ylabel("GK(r)")
ax2.set_title("Keen's G(r)")
plt.show()
# -------------------------------------#
# Output Utilities
[docs] def add_to_dataframe(self, x, y, df, title):
"""Takes X,Y dataset and adds it to the given Datframe **df**,
with the given **title**. Utility function for updating
the class DataFrames.
:param x: X-axis vector
:type x: numpy.array or list
:param y: Y-axis vector
:type y: numpy.array or list
:param df: DataFrame to append (**x**, **y**) pair to as a column
:type df: pandas.DataFrame
:param title: The title of the column in the DataFrame
:type title: str
:return: DataFrame with X,Y data appended with given title
:rtype: pandas.DataFrame
"""
df_temp = pd.DataFrame(y, columns=[title], index=x)
if title in df.columns:
df[title] = df_temp[title]
return df
df = pd.concat([df, df_temp], axis=1)
return df
def _write_out_df(self, df, cols, filename):
"""Helper function for writing out the DataFrame **df**
and the given columns, **cols**, to the filename in
the RMCProfile format.
:param df: DataFrame to write from to filename
:type df: pandas.DataFrame
:param cols: Column title list for columns to write out
:type cols: List of str
:param filename: Filename to write to
:type filename: str
"""
if df.empty:
raise ValueError("Empty dataframe. Cannot write out.")
with open(filename, 'w') as f:
f.write("%d \n" % df.shape[0])
f.write("# Comment line\n")
with open(filename, 'a') as f:
df.to_csv(f, sep='\t', columns=cols, header=False)
[docs] def write_out_merged_sq(self, filename=None):
"""Helper function for writing out the merged :math:`S(Q)`
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s.sq" % self.stem_name
self._write_out_df(self.df_sq_master, [self.sq_title], filename)
[docs] def write_out_merged_gr(self, filename=None):
"""Helper function for writing out the merged real space function
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s.gr" % self.stem_name
self._write_out_df(self.df_gr_master, [self.gr_title], filename)
[docs] def write_out_ft(self, filename=None):
"""Helper function for writing out the Fourier filter correction.
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "ft.dat"
self._write_out_df(self.df_sq_master, [self._ft_title], filename)
[docs] def write_out_ft_sq(self, filename=None):
"""Helper function for writing out the Fourier filtered :math:`S(Q)`
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s_ft.sq" % self.stem_name
self._write_out_df(self.df_sq_master, [self.sq_ft_title], filename)
[docs] def write_out_ft_gr(self, filename=None):
"""Helper function for writing out the Fourier filtered real space function
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s_ft.gr" % self.stem_name
self._write_out_df(self.df_gr_master, [self.gr_ft_title], filename)
[docs] def write_out_lorched_gr(self, filename=None):
"""Helper function for writing out the Lorch dampened real space function
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s_ft_lorched.gr" % self.stem_name
self._write_out_df(self.df_gr_master, [self.gr_lorch_title], filename)
[docs] def write_out_rmc_fq(self, filename=None):
"""Helper function for writing out the output :math:`F(Q)`
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s_rmc.fq" % self.stem_name
self._write_out_df(self.df_sq_master, [self.fq_title], filename)
[docs] def write_out_rmc_gr(self, filename=None):
"""Helper function for writing out the output :math:`G_{Keen Version}(Q)`
:param filename: Filename to write to
:type filename: str
"""
if filename is None:
filename = "%s_rmc.gr" % self.stem_name
self._write_out_df(self.df_gr_master, [self.GKofR_title], filename)