From 66e1bf34ff407b29fff80e582282073f73e77347 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Fri, 9 Sep 2022 11:35:54 -0500 Subject: [PATCH 01/20] Stage initial script for MPI implementation --- Scripts/mpi_mapSpecParametricScan.py | 136 +++++++++++++++++++++++++++ requirments.txt | 4 + 2 files changed, 140 insertions(+) create mode 100644 Scripts/mpi_mapSpecParametricScan.py create mode 100644 requirments.txt diff --git a/Scripts/mpi_mapSpecParametricScan.py b/Scripts/mpi_mapSpecParametricScan.py new file mode 100644 index 0000000..1bda687 --- /dev/null +++ b/Scripts/mpi_mapSpecParametricScan.py @@ -0,0 +1,136 @@ +''' + Copyright (c) 2017, UChicago Argonne, LLC + See LICENSE file. +''' +''' +Script to process a dataset using the Sector33SpecDataSource. This processes +a parametric scan where each line of the scan represents a different sample +environment parameter with all angles constant. +''' + +import os +import numpy as np +import xrayutilities as xu +from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource +from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader +from rsMap3D.utils.srange import srange +from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser +from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR +from rsMap3D.mappers.gridmapper import QGridMapper +from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT +from rsMap3D.transforms.unitytransform3d import UnityTransform3D +from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter +import json + +def updateDataSourceProgress(value1, value2): + print("DataSource Progress %s/%s" % (value1, value2)) + +def updateMapperProgress(value1): + print("Mapper Progress %s" % (value1)) + + +with open('config.json', 'r') as config_f: + config = json.load(config_f) + + + +# +#Change values listed here for a particular experiment +# +roi = None # defined later +projectDir = config["project_dir"] +specFile = config["spec_file"] + +scanList1 = ["172-173"] +#scanList2 = ['45-69',] + +mapHKL = True + +configDir = projectDir +detectorConfigName = os.path.join(configDir, config["detector_config"]) +instConfigName = os.path.join(configDir, config["instrument_config"]) +#instConfigName = os.path.join(configDir, "6IDB_Instrument_zPrimary.xml") +#badPixelFile = os.path.join(configDir, None) +if not os.path.exists(detectorConfigName): + raise Exception("Detector Config file does not exist: %s" % + detectorConfigName) +if not os.path.exists(instConfigName): + raise Exception("Instrument Config file does not exist: %s" % + instConfigName) +# if not os.path.exists(badPixelFile): +# raise Exception("Bad Pixel file does not exist: %s" % +# badPixelFile) + +dReader = detReader(detectorConfigName) + +#detectorSettings +detectorName = "Pilatus" +# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, +# uses the original data +bin = [1,1] +# Region of interesting +# set explicitly since the images are cropped by ROI +# COMMENT OUT roi or set to none to simply grab size from the calib file +roi = [1, 487, 1, 195] +# or get info from the config +if roi is None: + detector = dReader.getDetectorById(detectorName) + nPixels = dReader.getNpixels(detector) + roi = [1, nPixels[0], 1, nPixels[1]] +print ("ROI: %s " % roi) + +#output info = +nx = 300 +ny = 300 +nz = 10 +specName, specExt = os.path.splitext(specFile) + +# note that the q ranges in the three dimensions are available +# later. If you know resolution desired, it is possible to calculate more +# appropriate nx, ny, nz + +appConfig = RSMap3DConfigParser() +maxImageMemory = appConfig.getMaxImageMemory() +for scans in scanList1: + scanRange = srange(scans).list() + print ("scanRange %s" % scanRange) + print("specName, specExt: %s, %s" % (specName, specExt)) + ds = Sector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, roi=roi, + pixelsToAverage=bin, scanList= scanRange, +# badPixelFile=badPixelFile, + appConfig=appConfig) + ds.setCurrentDetector(detectorName) + ds.setProgressUpdater(updateDataSourceProgress) + ds.loadSource(mapHKL=mapHKL) + ds.setRangeBounds(ds.getOverallRanges()) + imageToBeUsed = ds.getImageToBeUsed() + + print("imageToBeUsed %s" % imageToBeUsed) +# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] + imageSize = np.prod(ds.getDetectorDimensions()) + print(scanRange[0]) + for imageInScan in range(1, len(imageToBeUsed[scanRange[0]])+1): + print ("Scan Line %d" % imageInScan) + + outputFileName = os.path.join(specName + \ + ('_N%d.vti' % imageInScan)) + gridWriter = VTIGridWriter() + + tmpImageUsed = imageToBeUsed[scanRange[0]] + savImageUsed = imageToBeUsed[scanRange[0]] + tmpImageUsed[imageInScan] = False + ds.imageToBeUsed[scanRange[0]] = [not i for i in tmpImageUsed] + gridMapper = QGridMapper(ds, + outputFileName, + outputType=BINARY_OUTPUT, + transform=UnityTransform3D(), + gridWriter=gridWriter, + appConfig=appConfig, + nx=nx, ny=ny, nz=nz) + + gridMapper.setProgressUpdater(updateMapperProgress) + gridMapper.doMap() + ds.imageToBeUsed[scanRange[0]] = savImageUsed + + \ No newline at end of file diff --git a/requirments.txt b/requirments.txt new file mode 100644 index 0000000..af47485 --- /dev/null +++ b/requirments.txt @@ -0,0 +1,4 @@ +PyQt5==5.15.6 +xrayutilities==1.7.3 +spec2nexus==2021.1.11 +vtk==9.1.0 \ No newline at end of file From 0c377da57ce86c1d760dbfd2151233111818f521 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Fri, 9 Sep 2022 14:55:40 -0500 Subject: [PATCH 02/20] Add initial MPI scripts --- Scripts/mpiMapSpecAngleScan.py | 124 +++++++++++++++++++++++++ Scripts/profileMapSpecAngleScan.py | 117 +++++++++++++++++++++++ rsMap3D/mappers/mpigridmapper.py | 143 +++++++++++++++++++++++++++++ 3 files changed, 384 insertions(+) create mode 100644 Scripts/mpiMapSpecAngleScan.py create mode 100644 Scripts/profileMapSpecAngleScan.py create mode 100644 rsMap3D/mappers/mpigridmapper.py diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py new file mode 100644 index 0000000..c66eab6 --- /dev/null +++ b/Scripts/mpiMapSpecAngleScan.py @@ -0,0 +1,124 @@ +''' + Copyright (c) 2017, UChicago Argonne, LLC + See LICENSE file. +''' +''' +Script to process a dataset using the Sector33SpecDataSource +''' + +import os +import numpy as np +import xrayutilities as xu +import json +import datetime +from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource +from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader +from rsMap3D.utils.srange import srange +from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser +from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR +from rsMap3D.mappers.mpigridmapper import MPIQGridMapper +from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT +from rsMap3D.transforms.unitytransform3d import UnityTransform3D +from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter + +from mpi4py import MPI +from mpi4py.futures import MPICommExecutor + +mpi_comm = MPI.COMM_WORLD +mpi_size = mpi_comm.Get_size() +mpi_rank = mpi_comm.Get_rank() + +def updateDataSourceProgress(value1, value2): + print("DataSource Progress %s/%s" % (value1, value2)) + +def updateMapperProgress(value1): + print("Mapper Progress %s" % (value1)) + +with open('config.json', 'r') as config_f: + config = json.load(config_f) + +with open('time.log', 'a') as time_log: + time_log.write(f'Start: {datetime.datetime.now()}\n') + + +# +#Change values listed here for a particular experiment +# +roi = None # defined later +projectDir = config["project_dir"] +specFile = config["spec_file"] +scanRange = srange(config["scan_range"]).list() + + + +# +#Change values listed here for a particular experiment +# +mapHKL = False +configDir = projectDir +detectorConfigName = os.path.join(configDir, config["detector_config"]) +instConfigName = os.path.join(configDir, config["instrument_config"]) +if not os.path.exists(detectorConfigName): + raise Exception("Detector Config file does not exist: %s" % + detectorConfigName) +if not os.path.exists(instConfigName): + raise Exception("Instrument Config file does not exist: %s" % + instConfigName) + +dReader = detReader(detectorConfigName) + +#detectorSettings +detectorName = "Pilatus" +# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, +# uses the original data +bin = [1,1] +# Region of interesting +# set explicitly since the images are cropped by ROI +# COMMENT OUT roi or set to none to simply grab size from the calib file +roi = [1, 487, 1, 195] +# or get info from the config +if roi is None: + detector = dReader.getDetectorById(detectorName) + nPixels = dReader.getNpixels(detector) + roi = [1, nPixels[0], 1, nPixels[1]] +print ("ROI: %s " % roi) + +#output info = +nx = 200 +ny = 200 +nz = 200 +specName, specExt = os.path.splitext(specFile) +outputFileName = os.path.join(specName + '.vti') +# note that the q ranges in the three dimensions are available +# later. If you know resolution desired, it is possible to calculate more +# appropriate nx, ny, nz + +appConfig = RSMap3DConfigParser() +maxImageMemory = appConfig.getMaxImageMemory() +print ("scanRange %s" % scanRange) +print("specName, specExt: %s, %s" % (specName, specExt)) +ds = Sector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, roi=roi, + pixelsToAverage=bin, scanList= scanRange, + appConfig=appConfig) +ds.setCurrentDetector(detectorName) +ds.setProgressUpdater(updateDataSourceProgress) +ds.loadSource(mapHKL=mapHKL) +ds.setRangeBounds(ds.getOverallRanges()) +imageToBeUsed = ds.getImageToBeUsed() +print("imageToBeUsed %s" % imageToBeUsed) +# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] +imageSize = np.prod(ds.getDetectorDimensions()) + +gridMapper = MPIQGridMapper(ds, + outputFileName, + outputType=BINARY_OUTPUT, + transform=UnityTransform3D(), + gridWriter=VTIGridWriter(), + appConfig=appConfig) + +gridMapper.setProgressUpdater(updateMapperProgress) +gridMapper.doMap() + +with open('time.log', 'a') as time_log: + time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file diff --git a/Scripts/profileMapSpecAngleScan.py b/Scripts/profileMapSpecAngleScan.py new file mode 100644 index 0000000..82e6256 --- /dev/null +++ b/Scripts/profileMapSpecAngleScan.py @@ -0,0 +1,117 @@ +''' + Copyright (c) 2017, UChicago Argonne, LLC + See LICENSE file. +''' +''' +Script to process a dataset using the Sector33SpecDataSource +''' + +import os +import numpy as np +import xrayutilities as xu +import json +import datetime +from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource +from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader +from rsMap3D.utils.srange import srange +from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser +from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR +from rsMap3D.mappers.gridmapper import QGridMapper +from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT +from rsMap3D.transforms.unitytransform3d import UnityTransform3D +from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter + +def updateDataSourceProgress(value1, value2): + print("DataSource Progress %s/%s" % (value1, value2)) + +def updateMapperProgress(value1): + print("Mapper Progress %s" % (value1)) + +with open('config.json', 'r') as config_f: + config = json.load(config_f) + +with open('time.log', 'a') as time_log: + time_log.write(f'Start: {datetime.datetime.now()}\n') + + +# +#Change values listed here for a particular experiment +# +roi = None # defined later +projectDir = config["project_dir"] +specFile = config["spec_file"] +scanRange = srange(config["scan_range"]).list() + + + +# +#Change values listed here for a particular experiment +# +mapHKL = False +configDir = projectDir +detectorConfigName = os.path.join(configDir, config["detector_config"]) +instConfigName = os.path.join(configDir, config["instrument_config"]) +if not os.path.exists(detectorConfigName): + raise Exception("Detector Config file does not exist: %s" % + detectorConfigName) +if not os.path.exists(instConfigName): + raise Exception("Instrument Config file does not exist: %s" % + instConfigName) + +dReader = detReader(detectorConfigName) + +#detectorSettings +detectorName = "Pilatus" +# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, +# uses the original data +bin = [1,1] +# Region of interesting +# set explicitly since the images are cropped by ROI +# COMMENT OUT roi or set to none to simply grab size from the calib file +roi = [1, 487, 1, 195] +# or get info from the config +if roi is None: + detector = dReader.getDetectorById(detectorName) + nPixels = dReader.getNpixels(detector) + roi = [1, nPixels[0], 1, nPixels[1]] +print ("ROI: %s " % roi) + +#output info = +nx = 200 +ny = 200 +nz = 200 +specName, specExt = os.path.splitext(specFile) +outputFileName = os.path.join(specName + '.vti') +# note that the q ranges in the three dimensions are available +# later. If you know resolution desired, it is possible to calculate more +# appropriate nx, ny, nz + +appConfig = RSMap3DConfigParser() +maxImageMemory = appConfig.getMaxImageMemory() +print ("scanRange %s" % scanRange) +print("specName, specExt: %s, %s" % (specName, specExt)) +ds = Sector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, roi=roi, + pixelsToAverage=bin, scanList= scanRange, + appConfig=appConfig) +ds.setCurrentDetector(detectorName) +ds.setProgressUpdater(updateDataSourceProgress) +ds.loadSource(mapHKL=mapHKL) +ds.setRangeBounds(ds.getOverallRanges()) +imageToBeUsed = ds.getImageToBeUsed() +print("imageToBeUsed %s" % imageToBeUsed) +# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] +imageSize = np.prod(ds.getDetectorDimensions()) + +gridMapper = QGridMapper(ds, + outputFileName, + outputType=BINARY_OUTPUT, + transform=UnityTransform3D(), + gridWriter=VTIGridWriter(), + appConfig=appConfig) + +gridMapper.setProgressUpdater(updateMapperProgress) +gridMapper.doMap() + +with open('time.log', 'a') as time_log: + time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py new file mode 100644 index 0000000..2ec98f9 --- /dev/null +++ b/rsMap3D/mappers/mpigridmapper.py @@ -0,0 +1,143 @@ +''' + Copyright (c) 2012, UChicago Argonne, LLC + See LICENSE file. +''' + +import xrayutilities as xu +from rsMap3D.mappers.abstractmapper import AbstractGridMapper +import numpy as np +from xrayutilities.exception import InputError +import logging +logger = logging.getLogger(__name__) + +class MPIQGridMapper(AbstractGridMapper): + ''' + Override parent class to add in output type + ''' + def __init__(self, dataSource, \ + outputFileName, \ + outputType, \ + mpi_rank, \ + nx=200, ny=201, nz=202, \ + transform = None, \ + gridWriter = None, + **kwargs): + super(MPIQGridMapper, self).__init__(dataSource, \ + outputFileName, \ + nx=nx, ny=ny, nz=nz, \ + transform = transform, \ + gridWriter = gridWriter, \ + **kwargs) + self.outputType = outputType + self.mpi_rank = mpi_rank + + def getFileInfo(self): + ''' + Override parent class to add in output type + ''' + return (self.dataSource.projectName, + self.dataSource.availableScans[0], + self.nx, self.ny, self.nz, + self.outputFileName, + self.outputType) + + ''' + This map provides an x, y, z grid of the data. + ''' + #@profile + def processMap(self, **kwargs): + """ + read ad frames and grid them in reciprocal space + angular coordinates are taken from the spec file + + **kwargs are passed to the rawmap function + """ + maxImageMem = self.appConfig.getMaxImageMemory() + gridder = xu.Gridder3D(self.nx, self.ny, self.nz) + gridder.KeepData(True) + rangeBounds = self.dataSource.getRangeBounds() + try: + # repository version or xrayutilities > 1.0.6 + gridder.dataRange(rangeBounds[0], rangeBounds[1], + rangeBounds[2], rangeBounds[3], + rangeBounds[4], rangeBounds[5], + True) + except: + # xrayutilities 1.0.6 and below + gridder.dataRange((rangeBounds[0], rangeBounds[1]), + (rangeBounds[2], rangeBounds[3]), + (rangeBounds[4], rangeBounds[5]), + True) + + imageToBeUsed = self.dataSource.getImageToBeUsed() + progress = 0 + for scan in self.dataSource.getAvailableScans(): + + if True in imageToBeUsed[scan]: + imageSize = self.dataSource.getDetectorDimensions()[0] * \ + self.dataSource.getDetectorDimensions()[1] + numImages = len(imageToBeUsed[scan]) + if imageSize*4*numImages <= maxImageMem: + kwargs['mask'] = imageToBeUsed[scan] + qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) + + # convert data to rectangular grid in reciprocal space + gridder(qx, qy, qz, intensity) + progress += 100 + if self.progressUpdater is not None: + self.progressUpdater(progress) + else: + nPasses = int(imageSize*4*numImages/ maxImageMem + 1) + + for thisPass in range(nPasses): + imageToBeUsedInPass = np.array(imageToBeUsed[scan]) + imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False + imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False + + if True in imageToBeUsedInPass: + kwargs['mask'] = imageToBeUsedInPass + qx, qy, qz, intensity = \ + self.dataSource.rawmap((scan,), **kwargs) + # convert data to rectangular grid in reciprocal space + try: + gridder(qx, qy, qz, intensity) + + progress += 1.0/nPasses* 100.0 + if self.progressUpdater is not None: + self.progressUpdater(progress) + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) + else: + progress += 1.0/nPasses* 100.0 + if self.progressUpdater is not None: + self.progressUpdater(progress) + self.progressUpdater(100.0) + return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder + + def doMap(self): + ''' + Produce a q map of the data. This is the method typically called to + run the mapper. This method calls the processMap method which is an + abstract method which needs to be defined in subclasses. + ''' + + # read and grid data with helper function + _start_time = time.time() + #rangeBounds = self.dataSource.getRangeBounds() + qx, qy, qz, gint, gridder = \ + self.processMap() + print ('Elapsed time for gridding: %.3f seconds' % \ + (time.time() - _start_time)) + + # print some information + print ('qx: ', qx.min(), ' .... ', qx.max()) + print ('qy: ', qy.min(), ' .... ', qy.max()) + print ('qz: ', qz.min(), ' .... ', qz.max()) + self.gridWriter.setData(qx, qy, qz, gint) + self.gridWriter.setFileInfo(self.getFileInfo()) + self.gridWriter.write() \ No newline at end of file From 0923d14671960694cb4f882784c97aa12e4ce012 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Fri, 9 Sep 2022 16:54:39 -0500 Subject: [PATCH 03/20] Add initial parallelization prototype --- Scripts/mpiMapSpecAngleScan.py | 15 +-- Scripts/mpi_mapSpecParametricScan.py | 136 --------------------------- rsMap3D/mappers/mpigridmapper.py | 86 +++++++++++------ 3 files changed, 67 insertions(+), 170 deletions(-) delete mode 100644 Scripts/mpi_mapSpecParametricScan.py diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index c66eab6..57c94d6 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -37,8 +37,9 @@ def updateMapperProgress(value1): with open('config.json', 'r') as config_f: config = json.load(config_f) -with open('time.log', 'a') as time_log: - time_log.write(f'Start: {datetime.datetime.now()}\n') +if mpi_rank == 0: + with open('time.log', 'a') as time_log: + time_log.write(f'Start: {datetime.datetime.now()}\n') # @@ -111,8 +112,9 @@ def updateMapperProgress(value1): imageSize = np.prod(ds.getDetectorDimensions()) gridMapper = MPIQGridMapper(ds, - outputFileName, - outputType=BINARY_OUTPUT, + outputFileName, + BINARY_OUTPUT, + mpi_comm, transform=UnityTransform3D(), gridWriter=VTIGridWriter(), appConfig=appConfig) @@ -120,5 +122,6 @@ def updateMapperProgress(value1): gridMapper.setProgressUpdater(updateMapperProgress) gridMapper.doMap() -with open('time.log', 'a') as time_log: - time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file +if mpi_rank == 0: + with open('time.log', 'a') as time_log: + time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file diff --git a/Scripts/mpi_mapSpecParametricScan.py b/Scripts/mpi_mapSpecParametricScan.py deleted file mode 100644 index 1bda687..0000000 --- a/Scripts/mpi_mapSpecParametricScan.py +++ /dev/null @@ -1,136 +0,0 @@ -''' - Copyright (c) 2017, UChicago Argonne, LLC - See LICENSE file. -''' -''' -Script to process a dataset using the Sector33SpecDataSource. This processes -a parametric scan where each line of the scan represents a different sample -environment parameter with all angles constant. -''' - -import os -import numpy as np -import xrayutilities as xu -from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource -from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader -from rsMap3D.utils.srange import srange -from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser -from rsMap3D.constants import ENERGY_WAVELENGTH_CONVERT_FACTOR -from rsMap3D.mappers.gridmapper import QGridMapper -from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT -from rsMap3D.transforms.unitytransform3d import UnityTransform3D -from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter -import json - -def updateDataSourceProgress(value1, value2): - print("DataSource Progress %s/%s" % (value1, value2)) - -def updateMapperProgress(value1): - print("Mapper Progress %s" % (value1)) - - -with open('config.json', 'r') as config_f: - config = json.load(config_f) - - - -# -#Change values listed here for a particular experiment -# -roi = None # defined later -projectDir = config["project_dir"] -specFile = config["spec_file"] - -scanList1 = ["172-173"] -#scanList2 = ['45-69',] - -mapHKL = True - -configDir = projectDir -detectorConfigName = os.path.join(configDir, config["detector_config"]) -instConfigName = os.path.join(configDir, config["instrument_config"]) -#instConfigName = os.path.join(configDir, "6IDB_Instrument_zPrimary.xml") -#badPixelFile = os.path.join(configDir, None) -if not os.path.exists(detectorConfigName): - raise Exception("Detector Config file does not exist: %s" % - detectorConfigName) -if not os.path.exists(instConfigName): - raise Exception("Instrument Config file does not exist: %s" % - instConfigName) -# if not os.path.exists(badPixelFile): -# raise Exception("Bad Pixel file does not exist: %s" % -# badPixelFile) - -dReader = detReader(detectorConfigName) - -#detectorSettings -detectorName = "Pilatus" -# set to reduce data by averaging pixels 1,1 produces no averaging of pixels, -# uses the original data -bin = [1,1] -# Region of interesting -# set explicitly since the images are cropped by ROI -# COMMENT OUT roi or set to none to simply grab size from the calib file -roi = [1, 487, 1, 195] -# or get info from the config -if roi is None: - detector = dReader.getDetectorById(detectorName) - nPixels = dReader.getNpixels(detector) - roi = [1, nPixels[0], 1, nPixels[1]] -print ("ROI: %s " % roi) - -#output info = -nx = 300 -ny = 300 -nz = 10 -specName, specExt = os.path.splitext(specFile) - -# note that the q ranges in the three dimensions are available -# later. If you know resolution desired, it is possible to calculate more -# appropriate nx, ny, nz - -appConfig = RSMap3DConfigParser() -maxImageMemory = appConfig.getMaxImageMemory() -for scans in scanList1: - scanRange = srange(scans).list() - print ("scanRange %s" % scanRange) - print("specName, specExt: %s, %s" % (specName, specExt)) - ds = Sector33SpecDataSource(projectDir, specName, specExt, - instConfigName, detectorConfigName, roi=roi, - pixelsToAverage=bin, scanList= scanRange, -# badPixelFile=badPixelFile, - appConfig=appConfig) - ds.setCurrentDetector(detectorName) - ds.setProgressUpdater(updateDataSourceProgress) - ds.loadSource(mapHKL=mapHKL) - ds.setRangeBounds(ds.getOverallRanges()) - imageToBeUsed = ds.getImageToBeUsed() - - print("imageToBeUsed %s" % imageToBeUsed) -# wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] - imageSize = np.prod(ds.getDetectorDimensions()) - print(scanRange[0]) - for imageInScan in range(1, len(imageToBeUsed[scanRange[0]])+1): - print ("Scan Line %d" % imageInScan) - - outputFileName = os.path.join(specName + \ - ('_N%d.vti' % imageInScan)) - gridWriter = VTIGridWriter() - - tmpImageUsed = imageToBeUsed[scanRange[0]] - savImageUsed = imageToBeUsed[scanRange[0]] - tmpImageUsed[imageInScan] = False - ds.imageToBeUsed[scanRange[0]] = [not i for i in tmpImageUsed] - gridMapper = QGridMapper(ds, - outputFileName, - outputType=BINARY_OUTPUT, - transform=UnityTransform3D(), - gridWriter=gridWriter, - appConfig=appConfig, - nx=nx, ny=ny, nz=nz) - - gridMapper.setProgressUpdater(updateMapperProgress) - gridMapper.doMap() - ds.imageToBeUsed[scanRange[0]] = savImageUsed - - \ No newline at end of file diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 2ec98f9..c0710bc 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -3,12 +3,24 @@ See LICENSE file. ''' +import time +from xml.dom.expatbuilder import InternalSubsetExtractor import xrayutilities as xu from rsMap3D.mappers.abstractmapper import AbstractGridMapper import numpy as np from xrayutilities.exception import InputError import logging +from mpi4py.futures import MPICommExecutor logger = logging.getLogger(__name__) +import datetime + + +def doPassExt(thisPass): + global _currMapper + print(f'Started_map {_currMapper.mpi_comm.Get_rank()} {datetime.datetime.now()}') + result = _currMapper.doPass(thisPass) + print(f'Ended map{_currMapper.mpi_comm.Get_rank()} {datetime.datetime.now()}') + return result class MPIQGridMapper(AbstractGridMapper): ''' @@ -17,7 +29,7 @@ class MPIQGridMapper(AbstractGridMapper): def __init__(self, dataSource, \ outputFileName, \ outputType, \ - mpi_rank, \ + mpi_comm, \ nx=200, ny=201, nz=202, \ transform = None, \ gridWriter = None, @@ -29,7 +41,7 @@ def __init__(self, dataSource, \ gridWriter = gridWriter, \ **kwargs) self.outputType = outputType - self.mpi_rank = mpi_rank + self.mpi_comm = mpi_comm def getFileInfo(self): ''' @@ -46,6 +58,7 @@ def getFileInfo(self): ''' #@profile def processMap(self, **kwargs): + global _currMapper """ read ad frames and grid them in reciprocal space angular coordinates are taken from the spec file @@ -88,37 +101,54 @@ def processMap(self, **kwargs): self.progressUpdater(progress) else: nPasses = int(imageSize*4*numImages/ maxImageMem + 1) - - for thisPass in range(nPasses): - imageToBeUsedInPass = np.array(imageToBeUsed[scan]) - imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False - imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False - - if True in imageToBeUsedInPass: - kwargs['mask'] = imageToBeUsedInPass - qx, qy, qz, intensity = \ - self.dataSource.rawmap((scan,), **kwargs) - # convert data to rectangular grid in reciprocal space - try: - gridder(qx, qy, qz, intensity) - + + passesList = list(range(nPasses)) + + self.pass_info = (kwargs, imageToBeUsed, scan, numImages, nPasses) + + _currMapper = self + with MPICommExecutor(self.mpi_comm, root=0) as executor: + if executor is not None: + for grid_data in executor.map(doPassExt, passesList): + if grid_data == -1: + continue + + qx, qy, qz, intensity = grid_data + try: + print(f'Starting Gridding {datetime.datetime.now()}') + gridder(qx, qy, qz, intensity) + + progress += 1.0/nPasses* 100.0 + if self.progressUpdater is not None: + self.progressUpdater(progress) + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) + else: progress += 1.0/nPasses* 100.0 if self.progressUpdater is not None: self.progressUpdater(progress) - except InputError as ex: - print ("Wrong Input to gridder") - print ("qx Size: " + str( qx.shape)) - print ("qy Size: " + str( qy.shape)) - print ("qz Size: " + str( qz.shape)) - print ("intensity Size: " + str(intensity.shape)) - raise InputError(ex) - else: - progress += 1.0/nPasses* 100.0 - if self.progressUpdater is not None: - self.progressUpdater(progress) - self.progressUpdater(100.0) + return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder + + def doPass(self, thisPass): + (kwargs, imageToBeUsed, scan, numImages, nPasses) = self.pass_info + + imageToBeUsedInPass = np.array(imageToBeUsed[scan]) + imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False + imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False + + if True in imageToBeUsedInPass: + kwargs['mask'] = imageToBeUsedInPass + return self.dataSource.rawmap((scan,), **kwargs) + + return -1 + def doMap(self): ''' Produce a q map of the data. This is the method typically called to From a61d6597eb05a0a3eda85846feb270859c9c0a6f Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Mon, 12 Sep 2022 15:40:37 -0500 Subject: [PATCH 04/20] Implement gridder moving strategy --- Scripts/mpiMapSpecAngleScan.py | 2 +- Scripts/profileMapSpecAngleScan.py | 2 +- rsMap3D/mappers/mpigridmapper.py | 157 ++++++++++++++++++----------- 3 files changed, 98 insertions(+), 63 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index 57c94d6..ee99484 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -107,7 +107,7 @@ def updateMapperProgress(value1): ds.loadSource(mapHKL=mapHKL) ds.setRangeBounds(ds.getOverallRanges()) imageToBeUsed = ds.getImageToBeUsed() -print("imageToBeUsed %s" % imageToBeUsed) +#print("imageToBeUsed %s" % imageToBeUsed) # wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] imageSize = np.prod(ds.getDetectorDimensions()) diff --git a/Scripts/profileMapSpecAngleScan.py b/Scripts/profileMapSpecAngleScan.py index 82e6256..af88332 100644 --- a/Scripts/profileMapSpecAngleScan.py +++ b/Scripts/profileMapSpecAngleScan.py @@ -99,7 +99,7 @@ def updateMapperProgress(value1): ds.loadSource(mapHKL=mapHKL) ds.setRangeBounds(ds.getOverallRanges()) imageToBeUsed = ds.getImageToBeUsed() -print("imageToBeUsed %s" % imageToBeUsed) +#print("imageToBeUsed %s" % imageToBeUsed) # wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] imageSize = np.prod(ds.getDetectorDimensions()) diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index c0710bc..08d2991 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -10,18 +10,13 @@ import numpy as np from xrayutilities.exception import InputError import logging -from mpi4py.futures import MPICommExecutor +from mpi4py import MPI logger = logging.getLogger(__name__) -import datetime +from datetime import datetime +import sys +import pickle -def doPassExt(thisPass): - global _currMapper - print(f'Started_map {_currMapper.mpi_comm.Get_rank()} {datetime.datetime.now()}') - result = _currMapper.doPass(thisPass) - print(f'Ended map{_currMapper.mpi_comm.Get_rank()} {datetime.datetime.now()}') - return result - class MPIQGridMapper(AbstractGridMapper): ''' Override parent class to add in output type @@ -41,7 +36,8 @@ def __init__(self, dataSource, \ gridWriter = gridWriter, \ **kwargs) self.outputType = outputType - self.mpi_comm = mpi_comm + self.mpi_comm = mpi_comm + self.mpi_rank = mpi_comm.Get_rank() def getFileInfo(self): ''' @@ -58,7 +54,6 @@ def getFileInfo(self): ''' #@profile def processMap(self, **kwargs): - global _currMapper """ read ad frames and grid them in reciprocal space angular coordinates are taken from the spec file @@ -84,7 +79,33 @@ def processMap(self, **kwargs): imageToBeUsed = self.dataSource.getImageToBeUsed() progress = 0 - for scan in self.dataSource.getAvailableScans(): + + + # Init gridder buffer on proc 0 + if self.mpi_rank == 0: + gridder_pickle = pickle.dumps(gridder) + win_size = len(gridder_pickle) * 2 + else: + win_size = 0 + + win = MPI.Win.Allocate(win_size, comm=self.mpi_comm) + + if self.mpi_rank == 0: + win.Lock(rank=0) + win.Put([gridder_pickle, MPI.BYTE], target_rank=0) + win.Unlock(rank=0) + + # Sync forced here + gridder_buff_size = self.mpi_comm.bcast(win_size, root=0) + + + scans = self.dataSource.getAvailableScans() + scans_split = np.array_split(scans, self.mpi_comm.size) + ind_scans= self.mpi_comm.scatter(scans_split, root=0) + print(ind_scans) + + # TODO: SET TO SCANS + for scan in ind_scans: if True in imageToBeUsed[scan]: imageSize = self.dataSource.getDetectorDimensions()[0] * \ @@ -93,61 +114,74 @@ def processMap(self, **kwargs): if imageSize*4*numImages <= maxImageMem: kwargs['mask'] = imageToBeUsed[scan] qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) - - # convert data to rectangular grid in reciprocal space + + + raise ValueError("TODO: REMOVE FOR TESTING") gridder(qx, qy, qz, intensity) + progress += 100 if self.progressUpdater is not None: self.progressUpdater(progress) else: nPasses = int(imageSize*4*numImages/ maxImageMem + 1) - passesList = list(range(nPasses)) - - self.pass_info = (kwargs, imageToBeUsed, scan, numImages, nPasses) - - _currMapper = self - with MPICommExecutor(self.mpi_comm, root=0) as executor: - if executor is not None: - for grid_data in executor.map(doPassExt, passesList): - if grid_data == -1: - continue - - qx, qy, qz, intensity = grid_data - try: - print(f'Starting Gridding {datetime.datetime.now()}') - gridder(qx, qy, qz, intensity) - - progress += 1.0/nPasses* 100.0 - if self.progressUpdater is not None: - self.progressUpdater(progress) - except InputError as ex: - print ("Wrong Input to gridder") - print ("qx Size: " + str( qx.shape)) - print ("qy Size: " + str( qy.shape)) - print ("qz Size: " + str( qz.shape)) - print ("intensity Size: " + str(intensity.shape)) - raise InputError(ex) - else: + + passes = np.array_split(list(range(nPasses)), self.mpi_comm.size) + ind_passes = self.mpi_comm.scatter(passes, root=0) + + # CURRENTLY SET TO SCANS + for thisPass in range(nPasses): + imageToBeUsedInPass = np.array(imageToBeUsed[scan]) + imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False + imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False + + if True in imageToBeUsedInPass: + kwargs['mask'] = imageToBeUsedInPass + qx, qy, qz, intensity = \ + self.dataSource.rawmap((scan,), **kwargs) + # convert data to rectangular grid in reciprocal space + try: + + gridder_buff = bytearray(gridder_buff_size) + win.Lock(rank=0) + print(f'R: {self.mpi_rank} Acquired Gridder {datetime.now():%M:%S}') + win.Get([gridder_buff, MPI.BYTE], target_rank=0) + print(f'Gridder Loaded {datetime.now():%M:%S}') + gridder = pickle.loads(gridder_buff) + gridder(qx, qy, qz, intensity) + print(f'R: {self.mpi_rank} Gridding Complete {datetime.now():%M:%S}') + gridder_buff = pickle.dumps(gridder) + win.Put([gridder_buff, MPI.BYTE], target_rank=0) + print(f'Gridder Sent {datetime.now():%M:%S}') + print(f'R: {self.mpi_rank} Released Complete {datetime.now():%M:%S}\n\n') + win.Unlock(rank=0) + progress += 1.0/nPasses* 100.0 if self.progressUpdater is not None: self.progressUpdater(progress) - - return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder - + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) + else: + progress += 1.0/nPasses* 100.0 + if self.progressUpdater is not None: + self.progressUpdater(progress) + self.progressUpdater(100.0) - def doPass(self, thisPass): - (kwargs, imageToBeUsed, scan, numImages, nPasses) = self.pass_info + self.mpi_comm.Barrier() - imageToBeUsedInPass = np.array(imageToBeUsed[scan]) - imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False - imageToBeUsedInPass[int((thisPass+1)*numImages/nPasses):] = False + if self.mpi_rank == 0: + gridder_buff = bytearray(gridder_buff_size) + win.Lock(rank=0) + win.Get([gridder_buff, MPI.BYTE], target_rank=0) + gridder = pickle.loads(gridder_buff) + win.Unlock(rank=0) - if True in imageToBeUsedInPass: - kwargs['mask'] = imageToBeUsedInPass - return self.dataSource.rawmap((scan,), **kwargs) - - return -1 + return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder def doMap(self): ''' @@ -164,10 +198,11 @@ def doMap(self): print ('Elapsed time for gridding: %.3f seconds' % \ (time.time() - _start_time)) - # print some information - print ('qx: ', qx.min(), ' .... ', qx.max()) - print ('qy: ', qy.min(), ' .... ', qy.max()) - print ('qz: ', qz.min(), ' .... ', qz.max()) - self.gridWriter.setData(qx, qy, qz, gint) - self.gridWriter.setFileInfo(self.getFileInfo()) - self.gridWriter.write() \ No newline at end of file + if self.mpi_rank == 0: + # print some information + print ('qx: ', qx.min(), ' .... ', qx.max()) + print ('qy: ', qy.min(), ' .... ', qy.max()) + print ('qz: ', qz.min(), ' .... ', qz.max()) + self.gridWriter.setData(qx, qy, qz, gint) + self.gridWriter.setFileInfo(self.getFileInfo()) + self.gridWriter.write() \ No newline at end of file From 1705f9cb879781fdf24627e428a788b0a0f4ce04 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Tue, 13 Sep 2022 10:08:30 -0500 Subject: [PATCH 05/20] Implement dynamic scan queue --- rsMap3D/mappers/mpigridmapper.py | 92 +++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 25 deletions(-) diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 08d2991..0e63110 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -16,6 +16,8 @@ import sys import pickle +SCAN_WIN_SIZE = 4 + class MPIQGridMapper(AbstractGridMapper): ''' @@ -84,28 +86,41 @@ def processMap(self, **kwargs): # Init gridder buffer on proc 0 if self.mpi_rank == 0: gridder_pickle = pickle.dumps(gridder) - win_size = len(gridder_pickle) * 2 + win_size = round(len(gridder_pickle) * 1.5) else: win_size = 0 - win = MPI.Win.Allocate(win_size, comm=self.mpi_comm) + gridderWin = MPI.Win.Allocate(win_size, comm=self.mpi_comm) if self.mpi_rank == 0: - win.Lock(rank=0) - win.Put([gridder_pickle, MPI.BYTE], target_rank=0) - win.Unlock(rank=0) + gridderWin.Lock(rank=0) + gridderWin.Put([gridder_pickle, MPI.BYTE], target_rank=0) + gridderWin.Unlock(rank=0) # Sync forced here gridder_buff_size = self.mpi_comm.bcast(win_size, root=0) scans = self.dataSource.getAvailableScans() - scans_split = np.array_split(scans, self.mpi_comm.size) - ind_scans= self.mpi_comm.scatter(scans_split, root=0) - print(ind_scans) - # TODO: SET TO SCANS - for scan in ind_scans: + if self.mpi_rank == 0: + scanWinSize = SCAN_WIN_SIZE + else: + scanWinSize = 0 + + scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpi_comm) + + scanIdx = self.mpi_rank + if self.mpi_rank == 0: + scanWin.Lock(rank=0) + scanWin.Put([self.mpi_comm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + self.mpi_comm.Barrier() + + while scanIdx < len(scans): + print(f'Starting Scan {scanIdx} of {len(scans)}') + scan = scans[scanIdx] if True in imageToBeUsed[scan]: imageSize = self.dataSource.getDetectorDimensions()[0] * \ @@ -113,23 +128,39 @@ def processMap(self, **kwargs): numImages = len(imageToBeUsed[scan]) if imageSize*4*numImages <= maxImageMem: kwargs['mask'] = imageToBeUsed[scan] + print(f'R: {self.mpi_rank} Mapping {datetime.now():%M:%S}') qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) + try: + + gridder_buff = bytearray(gridder_buff_size) + gridderWin.Lock(rank=0) + print(f'R: {self.mpi_rank} Acquired Gridder {datetime.now():%M:%S}') + gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) + print(f'Gridder Loaded {datetime.now():%M:%S}') + gridder = pickle.loads(gridder_buff) + gridder(qx, qy, qz, intensity) + print(f'R: {self.mpi_rank} Gridding Complete {datetime.now():%M:%S}') + gridder_buff = pickle.dumps(gridder) + gridderWin.Put([gridder_buff, MPI.BYTE], target_rank=0) + print(f'Gridder Sent {datetime.now():%M:%S}') + print(f'R: {self.mpi_rank} Released Complete {datetime.now():%M:%S}\n\n') + gridderWin.Unlock(rank=0) + + except InputError as ex: + print ("Wrong Input to gridder") + print ("qx Size: " + str( qx.shape)) + print ("qy Size: " + str( qy.shape)) + print ("qz Size: " + str( qz.shape)) + print ("intensity Size: " + str(intensity.shape)) + raise InputError(ex) - raise ValueError("TODO: REMOVE FOR TESTING") - gridder(qx, qy, qz, intensity) progress += 100 if self.progressUpdater is not None: self.progressUpdater(progress) else: nPasses = int(imageSize*4*numImages/ maxImageMem + 1) - - - passes = np.array_split(list(range(nPasses)), self.mpi_comm.size) - ind_passes = self.mpi_comm.scatter(passes, root=0) - - # CURRENTLY SET TO SCANS for thisPass in range(nPasses): imageToBeUsedInPass = np.array(imageToBeUsed[scan]) imageToBeUsedInPass[:int(thisPass*numImages/nPasses)] = False @@ -143,18 +174,18 @@ def processMap(self, **kwargs): try: gridder_buff = bytearray(gridder_buff_size) - win.Lock(rank=0) + gridderWin.Lock(rank=0) print(f'R: {self.mpi_rank} Acquired Gridder {datetime.now():%M:%S}') - win.Get([gridder_buff, MPI.BYTE], target_rank=0) + gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) print(f'Gridder Loaded {datetime.now():%M:%S}') gridder = pickle.loads(gridder_buff) gridder(qx, qy, qz, intensity) print(f'R: {self.mpi_rank} Gridding Complete {datetime.now():%M:%S}') gridder_buff = pickle.dumps(gridder) - win.Put([gridder_buff, MPI.BYTE], target_rank=0) + gridderWin.Put([gridder_buff, MPI.BYTE], target_rank=0) print(f'Gridder Sent {datetime.now():%M:%S}') print(f'R: {self.mpi_rank} Released Complete {datetime.now():%M:%S}\n\n') - win.Unlock(rank=0) + gridderWin.Unlock(rank=0) progress += 1.0/nPasses* 100.0 if self.progressUpdater is not None: @@ -170,16 +201,27 @@ def processMap(self, **kwargs): progress += 1.0/nPasses* 100.0 if self.progressUpdater is not None: self.progressUpdater(progress) + + scanBuff = bytearray(SCAN_WIN_SIZE) + scanWin.Lock(rank=0) + scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) + scanIdx = int.from_bytes(scanBuff, 'little') + print(f'Recieved Scan {scanIdx}') + scanNext = scanIdx + 1 + scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + self.progressUpdater(100.0) self.mpi_comm.Barrier() if self.mpi_rank == 0: gridder_buff = bytearray(gridder_buff_size) - win.Lock(rank=0) - win.Get([gridder_buff, MPI.BYTE], target_rank=0) + gridderWin.Lock(rank=0) + gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) gridder = pickle.loads(gridder_buff) - win.Unlock(rank=0) + gridderWin.Unlock(rank=0) return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder From bd3d847a5f46e9f3adfb5d9c4c6aa7ed7493e93a Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Tue, 13 Sep 2022 13:22:16 -0500 Subject: [PATCH 06/20] Add diff outputs to profiling scripts --- Scripts/mpiMapSpecAngleScan.py | 7 +++++-- Scripts/profileMapSpecAngleScan.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index ee99484..3ae99e6 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -38,8 +38,9 @@ def updateMapperProgress(value1): config = json.load(config_f) if mpi_rank == 0: + start_time = datetime.datetime.now() with open('time.log', 'a') as time_log: - time_log.write(f'Start: {datetime.datetime.now()}\n') + time_log.write(f'Start: {start_time}\n') # @@ -124,4 +125,6 @@ def updateMapperProgress(value1): if mpi_rank == 0: with open('time.log', 'a') as time_log: - time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file + end_time = datetime.datetime.now() + time_log.write(f'End: {end_time}\n') + time_log.write(f'Diff: {end_time - start_time}\n') \ No newline at end of file diff --git a/Scripts/profileMapSpecAngleScan.py b/Scripts/profileMapSpecAngleScan.py index af88332..4d4fcf7 100644 --- a/Scripts/profileMapSpecAngleScan.py +++ b/Scripts/profileMapSpecAngleScan.py @@ -30,8 +30,9 @@ def updateMapperProgress(value1): with open('config.json', 'r') as config_f: config = json.load(config_f) +start_time = datetime.datetime.now() with open('time.log', 'a') as time_log: - time_log.write(f'Start: {datetime.datetime.now()}\n') + time_log.write(f'Start: {start_time}\n') # @@ -114,4 +115,6 @@ def updateMapperProgress(value1): gridMapper.doMap() with open('time.log', 'a') as time_log: - time_log.write(f'End: {datetime.datetime.now()}\n') \ No newline at end of file + end_time = datetime.datetime.now() + time_log.write(f'End: {end}\n') + time_log.write(f'Diff: {end_time - start_time}\n') \ No newline at end of file From 5a4ef7922c5f955c2366ff9027c97c2502ea569b Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Tue, 13 Sep 2022 14:04:09 -0500 Subject: [PATCH 07/20] Implement single-process data loading --- Scripts/mpiMapSpecAngleScan.py | 16 ++++++++- rsMap3D/datasource/Sector33SpecDataSource.py | 36 ++++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index 3ae99e6..d455d13 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -105,7 +105,21 @@ def updateMapperProgress(value1): appConfig=appConfig) ds.setCurrentDetector(detectorName) ds.setProgressUpdater(updateDataSourceProgress) -ds.loadSource(mapHKL=mapHKL) + +loadScans = mpi_rank == 0 + +ds.loadSource(mapHKL=mapHKL, loadScans=loadScans) + +if mpi_rank == 0: + scanData = ds.exportScans() +else: + scanData = None + +scanData = mpi_comm.bcast(scanData, root=0) + +if mpi_rank != 0: + ds.importScans(scanData) + ds.setRangeBounds(ds.getOverallRanges()) imageToBeUsed = ds.getImageToBeUsed() #print("imageToBeUsed %s" % imageToBeUsed) diff --git a/rsMap3D/datasource/Sector33SpecDataSource.py b/rsMap3D/datasource/Sector33SpecDataSource.py index 42bf5a3..e54db1d 100644 --- a/rsMap3D/datasource/Sector33SpecDataSource.py +++ b/rsMap3D/datasource/Sector33SpecDataSource.py @@ -221,7 +221,7 @@ def hotpixelkill(self, areaData): return areaData - def loadSource(self, mapHKL=False): + def loadSource(self, mapHKL=False, loadScans=True): ''' This method does the work of loading data from the files. This has been split off from the constructor to allow this to be threaded and later @@ -250,9 +250,14 @@ def loadSource(self, mapHKL=False): if not (self.flatFieldFile is None): self.flatFieldData = np.array(Image.open(self.flatFieldFile)).T # Load scan information from the spec file + + self.sd = SpecDataFile(self.specFile) + self.mapHKL = mapHKL + + if not loadScans: + return + try: - self.sd = SpecDataFile(self.specFile) - self.mapHKL = mapHKL maxScan = int(self.sd.getMaxScanNumber()) logger.debug("Number of Scans" + str(maxScan)) if self.scans is None: @@ -323,6 +328,31 @@ def loadSource(self, mapHKL=False): self.availableScanTypes = set(self.scanType.values()) + def exportScans(self): + """ + Returns loaded data as an MPI-safe object + """ + return { + 'scans': self.scans, + 'imageBounds': self.imageBounds, + 'imageToBeUsed': self.imageToBeUsed, + 'availableScans':self.availableScans, + 'incidentEnergy': self.incidentEnergy, + 'ubMatrix':self.ubMatrix, + 'availableScanTypes': self.availableScanTypes + } + + def importScans(self, scanData): + """ + Imports exported scan data from another data source via an MPI-safe object. + """ + self.scans = scanData['scans'] + self.imageBounds = scanData['imageBounds'] + self.imageToBeUsed = scanData['imageToBeUsed'] + self.availableScans = scanData['availableScans'] + self.incidentEnergy = scanData['incidentEnergy'] + self.ubMatrix = scanData['ubMatrix'] + self.availableScanTypes = scanData['availableScanTypes'] def to_bool(self, value): """ From 8651d1455da11bb39c0dbe8fc2e001b944f96c7f Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 14 Sep 2022 11:34:04 -0500 Subject: [PATCH 08/20] Add source load times to logging --- Scripts/mpiMapSpecAngleScan.py | 2 ++ Scripts/profileMapSpecAngleScan.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index d455d13..3823c67 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -111,6 +111,8 @@ def updateMapperProgress(value1): ds.loadSource(mapHKL=mapHKL, loadScans=loadScans) if mpi_rank == 0: + with open('time.log', 'a') as time_log: + time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') scanData = ds.exportScans() else: scanData = None diff --git a/Scripts/profileMapSpecAngleScan.py b/Scripts/profileMapSpecAngleScan.py index 4d4fcf7..b35e4ac 100644 --- a/Scripts/profileMapSpecAngleScan.py +++ b/Scripts/profileMapSpecAngleScan.py @@ -104,6 +104,9 @@ def updateMapperProgress(value1): # wavelen = ENERGY_WAVELENGTH_CONVERT_FACTOR/ds.getIncidentEnergy()[scans[0]] imageSize = np.prod(ds.getDetectorDimensions()) +with open('time.log', 'a') as time_log: + time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') + gridMapper = QGridMapper(ds, outputFileName, outputType=BINARY_OUTPUT, @@ -116,5 +119,5 @@ def updateMapperProgress(value1): with open('time.log', 'a') as time_log: end_time = datetime.datetime.now() - time_log.write(f'End: {end}\n') + time_log.write(f'End: {end_time}\n') time_log.write(f'Diff: {end_time - start_time}\n') \ No newline at end of file From eb23090933a84ff4f2565590bd57a3d192e65615 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 14 Sep 2022 15:01:07 -0500 Subject: [PATCH 09/20] Implement merge strategy --- rsMap3D/mappers/mpigridmapper.py | 112 ++++++++++++++----------------- 1 file changed, 52 insertions(+), 60 deletions(-) diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 0e63110..1a9346b 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -15,6 +15,7 @@ from datetime import datetime import sys import pickle +import math SCAN_WIN_SIZE = 4 @@ -26,7 +27,7 @@ class MPIQGridMapper(AbstractGridMapper): def __init__(self, dataSource, \ outputFileName, \ outputType, \ - mpi_comm, \ + mpiComm, \ nx=200, ny=201, nz=202, \ transform = None, \ gridWriter = None, @@ -38,8 +39,8 @@ def __init__(self, dataSource, \ gridWriter = gridWriter, \ **kwargs) self.outputType = outputType - self.mpi_comm = mpi_comm - self.mpi_rank = mpi_comm.Get_rank() + self.mpiComm = mpiComm + self.mpiRank = mpiComm.Get_rank() def getFileInfo(self): ''' @@ -83,40 +84,22 @@ def processMap(self, **kwargs): progress = 0 - # Init gridder buffer on proc 0 - if self.mpi_rank == 0: - gridder_pickle = pickle.dumps(gridder) - win_size = round(len(gridder_pickle) * 1.5) - else: - win_size = 0 - - gridderWin = MPI.Win.Allocate(win_size, comm=self.mpi_comm) - - if self.mpi_rank == 0: - gridderWin.Lock(rank=0) - gridderWin.Put([gridder_pickle, MPI.BYTE], target_rank=0) - gridderWin.Unlock(rank=0) - - # Sync forced here - gridder_buff_size = self.mpi_comm.bcast(win_size, root=0) - - scans = self.dataSource.getAvailableScans() - if self.mpi_rank == 0: + if self.mpiRank == 0: scanWinSize = SCAN_WIN_SIZE else: scanWinSize = 0 - scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpi_comm) + scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpiComm) - scanIdx = self.mpi_rank - if self.mpi_rank == 0: + scanIdx = self.mpiRank + if self.mpiRank == 0: scanWin.Lock(rank=0) - scanWin.Put([self.mpi_comm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Put([self.mpiComm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) scanWin.Unlock(rank=0) - self.mpi_comm.Barrier() + self.mpiComm.Barrier() while scanIdx < len(scans): print(f'Starting Scan {scanIdx} of {len(scans)}') @@ -128,24 +111,15 @@ def processMap(self, **kwargs): numImages = len(imageToBeUsed[scan]) if imageSize*4*numImages <= maxImageMem: kwargs['mask'] = imageToBeUsed[scan] - print(f'R: {self.mpi_rank} Mapping {datetime.now():%M:%S}') + print(f'R: {self.mpiRank} Mapping {datetime.now():%M:%S}') qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) try: - gridder_buff = bytearray(gridder_buff_size) - gridderWin.Lock(rank=0) - print(f'R: {self.mpi_rank} Acquired Gridder {datetime.now():%M:%S}') - gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) - print(f'Gridder Loaded {datetime.now():%M:%S}') - gridder = pickle.loads(gridder_buff) gridder(qx, qy, qz, intensity) - print(f'R: {self.mpi_rank} Gridding Complete {datetime.now():%M:%S}') - gridder_buff = pickle.dumps(gridder) - gridderWin.Put([gridder_buff, MPI.BYTE], target_rank=0) - print(f'Gridder Sent {datetime.now():%M:%S}') - print(f'R: {self.mpi_rank} Released Complete {datetime.now():%M:%S}\n\n') - gridderWin.Unlock(rank=0) + progress += 100 + if self.progressUpdater is not None: + self.progressUpdater(progress) except InputError as ex: print ("Wrong Input to gridder") @@ -173,19 +147,7 @@ def processMap(self, **kwargs): # convert data to rectangular grid in reciprocal space try: - gridder_buff = bytearray(gridder_buff_size) - gridderWin.Lock(rank=0) - print(f'R: {self.mpi_rank} Acquired Gridder {datetime.now():%M:%S}') - gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) - print(f'Gridder Loaded {datetime.now():%M:%S}') - gridder = pickle.loads(gridder_buff) gridder(qx, qy, qz, intensity) - print(f'R: {self.mpi_rank} Gridding Complete {datetime.now():%M:%S}') - gridder_buff = pickle.dumps(gridder) - gridderWin.Put([gridder_buff, MPI.BYTE], target_rank=0) - print(f'Gridder Sent {datetime.now():%M:%S}') - print(f'R: {self.mpi_rank} Released Complete {datetime.now():%M:%S}\n\n') - gridderWin.Unlock(rank=0) progress += 1.0/nPasses* 100.0 if self.progressUpdater is not None: @@ -214,17 +176,47 @@ def processMap(self, **kwargs): self.progressUpdater(100.0) - self.mpi_comm.Barrier() - if self.mpi_rank == 0: - gridder_buff = bytearray(gridder_buff_size) - gridderWin.Lock(rank=0) - gridderWin.Get([gridder_buff, MPI.BYTE], target_rank=0) - gridder = pickle.loads(gridder_buff) - gridderWin.Unlock(rank=0) + gridder = self.mergeGridders(gridder) + self.mpiComm.Barrier() return gridder.xaxis,gridder.yaxis,gridder.zaxis,gridder.data,gridder + + def mergeGridders(self, gridder): + """ + Merges gridders, combining their grids till all data is collated at proc 0. + Similar to the upward execution of a distributed merge-sort. + """ + worldSize = self.mpiComm.Get_size() + + depth = math.floor(np.log2(worldSize)) + 1 + for curr_depth in range(depth): + + # Exclude procs that have merged + if self.mpiRank % (2**curr_depth) != 0: + break + + # Determine if sending or receiving + if (self.mpiRank / (2**curr_depth)) % 2 == 0: + source = self.mpiRank + (2**curr_depth) + + # Odd # world size + if source >= worldSize: + continue + + incomingGrid = self.mpiComm.recv(source=source) + # Normalization must be OFF and range must be FIXED for this to work + # These are the defaults as of the current version of xu (1.7.3) + gridder._gdata += incomingGrid._gdata + gridder._gnorm += incomingGrid._gnorm + + else: + dest = self.mpiRank - (2 ** curr_depth) + self.mpiComm.send(gridder, dest=dest) + return gridder + + def doMap(self): ''' Produce a q map of the data. This is the method typically called to @@ -240,7 +232,7 @@ def doMap(self): print ('Elapsed time for gridding: %.3f seconds' % \ (time.time() - _start_time)) - if self.mpi_rank == 0: + if self.mpiRank == 0: # print some information print ('qx: ', qx.min(), ' .... ', qx.max()) print ('qy: ', qy.min(), ' .... ', qy.max()) From cae3c60d49038528be9afefeecba698444a4e6f3 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Thu, 15 Sep 2022 11:25:09 -0500 Subject: [PATCH 10/20] Clean up MPI implementation --- Scripts/mpiMapSpecAngleScan.py | 42 +++++++++++--------- requirments.txt | 4 -- rsMap3D/datasource/Sector33SpecDataSource.py | 10 ++--- rsMap3D/mappers/mpigridmapper.py | 11 +++-- 4 files changed, 33 insertions(+), 34 deletions(-) delete mode 100644 requirments.txt diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index 3823c67..c815112 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -20,13 +20,19 @@ from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT from rsMap3D.transforms.unitytransform3d import UnityTransform3D from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter +import logging + +logging.basicConfig(format='%(asctime)s %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p', + filename='mpiMap.log', + level=logging.INFO) + from mpi4py import MPI from mpi4py.futures import MPICommExecutor -mpi_comm = MPI.COMM_WORLD -mpi_size = mpi_comm.Get_size() -mpi_rank = mpi_comm.Get_rank() +mpiComm = MPI.COMM_WORLD +mpiRank = mpiComm.Get_rank() def updateDataSourceProgress(value1, value2): print("DataSource Progress %s/%s" % (value1, value2)) @@ -37,10 +43,10 @@ def updateMapperProgress(value1): with open('config.json', 'r') as config_f: config = json.load(config_f) -if mpi_rank == 0: - start_time = datetime.datetime.now() - with open('time.log', 'a') as time_log: - time_log.write(f'Start: {start_time}\n') + +if mpiRank == 0: + startTime = datetime.datetime.now() + logging.info('Starting Mapping') # @@ -106,20 +112,19 @@ def updateMapperProgress(value1): ds.setCurrentDetector(detectorName) ds.setProgressUpdater(updateDataSourceProgress) -loadScans = mpi_rank == 0 +loadScans = mpiRank == 0 ds.loadSource(mapHKL=mapHKL, loadScans=loadScans) -if mpi_rank == 0: - with open('time.log', 'a') as time_log: - time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') +if mpiRank == 0: + logging.info("Finished loading source") scanData = ds.exportScans() else: scanData = None -scanData = mpi_comm.bcast(scanData, root=0) +scanData = mpiComm.bcast(scanData, root=0) -if mpi_rank != 0: +if mpiRank != 0: ds.importScans(scanData) ds.setRangeBounds(ds.getOverallRanges()) @@ -131,7 +136,7 @@ def updateMapperProgress(value1): gridMapper = MPIQGridMapper(ds, outputFileName, BINARY_OUTPUT, - mpi_comm, + mpiComm, transform=UnityTransform3D(), gridWriter=VTIGridWriter(), appConfig=appConfig) @@ -139,8 +144,7 @@ def updateMapperProgress(value1): gridMapper.setProgressUpdater(updateMapperProgress) gridMapper.doMap() -if mpi_rank == 0: - with open('time.log', 'a') as time_log: - end_time = datetime.datetime.now() - time_log.write(f'End: {end_time}\n') - time_log.write(f'Diff: {end_time - start_time}\n') \ No newline at end of file +if mpiRank == 0: + logging.info('Ended map') + endTime = datetime.datetime.now() + logging.info(f'Diff: {endTime - startTime}\n') \ No newline at end of file diff --git a/requirments.txt b/requirments.txt deleted file mode 100644 index af47485..0000000 --- a/requirments.txt +++ /dev/null @@ -1,4 +0,0 @@ -PyQt5==5.15.6 -xrayutilities==1.7.3 -spec2nexus==2021.1.11 -vtk==9.1.0 \ No newline at end of file diff --git a/rsMap3D/datasource/Sector33SpecDataSource.py b/rsMap3D/datasource/Sector33SpecDataSource.py index e54db1d..28da353 100644 --- a/rsMap3D/datasource/Sector33SpecDataSource.py +++ b/rsMap3D/datasource/Sector33SpecDataSource.py @@ -251,13 +251,13 @@ def loadSource(self, mapHKL=False, loadScans=True): self.flatFieldData = np.array(Image.open(self.flatFieldFile)).T # Load scan information from the spec file - self.sd = SpecDataFile(self.specFile) - self.mapHKL = mapHKL + try: + self.sd = SpecDataFile(self.specFile) + self.mapHKL = mapHKL - if not loadScans: - return + if not loadScans: + return - try: maxScan = int(self.sd.getMaxScanNumber()) logger.debug("Number of Scans" + str(maxScan)) if self.scans is None: diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 1a9346b..828207a 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -111,7 +111,6 @@ def processMap(self, **kwargs): numImages = len(imageToBeUsed[scan]) if imageSize*4*numImages <= maxImageMem: kwargs['mask'] = imageToBeUsed[scan] - print(f'R: {self.mpiRank} Mapping {datetime.now():%M:%S}') qx, qy, qz, intensity = self.dataSource.rawmap((scan,), **kwargs) try: @@ -191,15 +190,15 @@ def mergeGridders(self, gridder): worldSize = self.mpiComm.Get_size() depth = math.floor(np.log2(worldSize)) + 1 - for curr_depth in range(depth): + for currDepth in range(depth): # Exclude procs that have merged - if self.mpiRank % (2**curr_depth) != 0: + if self.mpiRank % (2**currDepth) != 0: break # Determine if sending or receiving - if (self.mpiRank / (2**curr_depth)) % 2 == 0: - source = self.mpiRank + (2**curr_depth) + if (self.mpiRank / (2**currDepth)) % 2 == 0: + source = self.mpiRank + (2**currDepth) # Odd # world size if source >= worldSize: @@ -212,7 +211,7 @@ def mergeGridders(self, gridder): gridder._gnorm += incomingGrid._gnorm else: - dest = self.mpiRank - (2 ** curr_depth) + dest = self.mpiRank - (2 ** currDepth) self.mpiComm.send(gridder, dest=dest) return gridder From bbb0870d6e9d66e6d555aa53b4020c3861201503 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Thu, 15 Sep 2022 14:19:54 -0500 Subject: [PATCH 11/20] Implement data loading merging --- Scripts/mpiMapSpecAngleScan.py | 20 +- .../datasource/MpiSector33SpecDataSource.py | 651 ++++++++++++++++++ 2 files changed, 655 insertions(+), 16 deletions(-) create mode 100644 rsMap3D/datasource/MpiSector33SpecDataSource.py diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index c815112..7ef70b8 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -11,7 +11,7 @@ import xrayutilities as xu import json import datetime -from rsMap3D.datasource.Sector33SpecDataSource import Sector33SpecDataSource +from rsMap3D.datasource.MpiSector33SpecDataSource import MPISector33SpecDataSource from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader from rsMap3D.utils.srange import srange from rsMap3D.config.rsmap3dconfigparser import RSMap3DConfigParser @@ -105,27 +105,15 @@ def updateMapperProgress(value1): maxImageMemory = appConfig.getMaxImageMemory() print ("scanRange %s" % scanRange) print("specName, specExt: %s, %s" % (specName, specExt)) -ds = Sector33SpecDataSource(projectDir, specName, specExt, - instConfigName, detectorConfigName, roi=roi, +ds = MPISector33SpecDataSource(projectDir, specName, specExt, + instConfigName, detectorConfigName, mpiComm, roi=roi, pixelsToAverage=bin, scanList= scanRange, appConfig=appConfig) ds.setCurrentDetector(detectorName) ds.setProgressUpdater(updateDataSourceProgress) -loadScans = mpiRank == 0 +ds.loadSource(mapHKL=mapHKL) -ds.loadSource(mapHKL=mapHKL, loadScans=loadScans) - -if mpiRank == 0: - logging.info("Finished loading source") - scanData = ds.exportScans() -else: - scanData = None - -scanData = mpiComm.bcast(scanData, root=0) - -if mpiRank != 0: - ds.importScans(scanData) ds.setRangeBounds(ds.getOverallRanges()) imageToBeUsed = ds.getImageToBeUsed() diff --git a/rsMap3D/datasource/MpiSector33SpecDataSource.py b/rsMap3D/datasource/MpiSector33SpecDataSource.py new file mode 100644 index 0000000..60d8bd9 --- /dev/null +++ b/rsMap3D/datasource/MpiSector33SpecDataSource.py @@ -0,0 +1,651 @@ +''' + Copyright (c) 2012, UChicago Argonne, LLC + See LICENSE file. +''' +from multiprocessing.sharedctypes import Value +import os +from spec2nexus.spec import SpecDataFile +from rsMap3D.exception.rsmap3dexception import RSMap3DException,\ + ScanDataMissingException +from rsMap3D.gui.rsm3dcommonstrings import CANCEL_STR +from rsMap3D.datasource.pilatusbadpixelfile import PilatusBadPixelFile +from rsMap3D.mappers.abstractmapper import ProcessCanceledException +from rsMap3D.datasource.specxmldrivendatasource import SpecXMLDrivenDataSource +import numpy as np +import xrayutilities as xu +import time +import sys,traceback +import logging +import importlib +import math +from mpi4py import MPI +try: + from PIL import Image +except ImportError: + import Image +logger = logging.getLogger(__name__) + +SCAN_WIN_SIZE = 4 + +IMAGE_DIR_MERGE_STR = "images/%s" +SCAN_NUMBER_MERGE_STR = "S%03d" +TIFF_FILE_MERGE_STR = "S%%03d/%s_S%%03d_%%05d.tif" + +class MPISector33SpecDataSource(SpecXMLDrivenDataSource): + ''' + Class to load data from spec file and configuration xml files from + for the way that data is collected at sector 33. + :members + ''' + + + def __init__(self, + projectDir, + projectName, + projectExtension, + instConfigFile, + detConfigFile, + mpiComm, + **kwargs): + ''' + Constructor + :param projectDir: Directory holding the project file to open + :param projectName: First part of file name for the project + :param projectExt: File extension for the project file. + :param instConfigFile: Full path to Instrument configuration file. + :param detConfigFile: Full path to the detector configuration file + :param kwargs: Assorted keyword arguments + + :rtype: Sector33SpecDataSource + ''' + super(MPISector33SpecDataSource, self).__init__(projectDir, + projectName, + projectExtension, + instConfigFile, + detConfigFile, + **kwargs) + self.mpiComm = mpiComm + self.mpiRank = mpiComm.Get_rank() + + def _calc_eulerian_from_kappa(self, primaryAngles=None, + referenceAngles = None): + """ + Calculate the eulerian sample angles from the kappa stage angles. + :param primaryAngles: list of sample axis numbers to be handled by + the conversion + :param referenceAngles: list of reference angles to be used in angle + conversion + """ + + keta = np.deg2rad(referenceAngles[:,0]) + kappa = np.deg2rad(referenceAngles[:,1]) + kphi = np.deg2rad(referenceAngles[:,2]) + kappaParam = self.instConfig.getSampleAngleMappingParameter('kappa') + + try: + if kappaParam != None: + self.kalpha = np.deg2rad(float(kappaParam)) + else: + self.kalpha = np.deg2rad(50.000) + kappaInvertedParam = \ + self.instConfig.getSampleAngleMappingParameter('kappaInverted') + if kappaInvertedParam != None: + self.kappa_inverted = self.to_bool(kappaInvertedParam) + else: + self.kappa_inverted = False + except Exception as ex: + raise RSMap3DException("Error trying to get parameter for " + \ + "sampleAngleMappingFunction " + \ + "_calc_eulerian_from_kappa in inst config " + \ + "file\n" + \ + str(ex)) + + _t1 = np.arctan(np.tan(kappa / 2.0) * np.cos(self.kalpha)) + if self.kappa_inverted: + eta = np.rad2deg(keta + _t1) + phi = np.rad2deg(kphi + _t1) + else: + eta = np.rad2deg(keta - _t1) + phi = np.rad2deg(kphi - _t1) + chi = 2.0 * np.rad2deg(np.arcsin(np.sin(kappa / 2.0) * \ + np.sin(self.kalpha))) + + return eta, chi, phi + + def _calc_replace_angle_values(self , primaryAngles=None, + referenceAngles=None): + ''' + Fix a situation were some constant angle values have been + recorded incorrectly and need to be fixed. + :param primaryAngles: list of sample axis numbers to be handled by + the conversion + :param referenceAngles: list of reference angles to be used in angle + conversion + ''' + logger.info( "Running " + __name__) + angles = [] + logger.debug("referenceAngles " + str(referenceAngles)) + mappingAngles = self.instConfig.getSampleAngleMappingReferenceAngles() + + logger.debug( "mappingAngles" + str(mappingAngles)) + for ii in range(len(referenceAngles)): + replaceVal = \ + float(self.instConfig.getSampleAngleMappingReferenceAngleAttrib( \ + number= str(mappingAngles[ii]), \ + attribName='replaceValue')) + logger.debug( "primary Angles" + str(referenceAngles)) + angles.append(replaceVal* np.ones(len(referenceAngles[:,ii]),)) + logger.debug("Angles" + str( angles)) + return angles + + def fixGeoAngles(self, scan, angles): + ''' + Fix the angles using a user selected function. + :param scan: scan to set the angles for + :param angles: Array of angles to set for this scan + ''' + logger.debug( "starting " + __name__) + needToCorrect = False + refAngleNames = self.instConfig.getSampleAngleMappingReferenceAngles() + for refAngleName in refAngleNames: + alwaysFix = self.instConfig.getSampleAngleMappingAlwaysFix() + if refAngleName in scan.L or alwaysFix: + needToCorrect = True + + if needToCorrect: + logger.debug( __name__ + ": Fixing angles") + refAngles = self.getScanAngles(scan, refAngleNames) + primaryAngles = self.instConfig.getSampleAngleMappingPrimaryAngles() + functionName = self.instConfig.getSampleAngleMappingFunctionName() + functionModuleName = self.instConfig.getSampleAngleMappingFunctionModule() + logger.debug("sampleMappingFunction moduleName %s" % functionModuleName) + #Call a defined method to calculate angles from the reference angles. + moduleSource = self + if functionModuleName != None: + functionModule = importlib.import_module(functionModuleName) + logger.debug("dir(functionModule)" % dir(functionModule)) + moduleSource = functionModule + method = getattr(moduleSource, functionName) + fixedAngles = method(primaryAngles=primaryAngles, + referenceAngles=refAngles) + logger.debug ("fixed Angles: " + str(fixedAngles)) + for i in range(len(primaryAngles)): + logger.debug ("Fixing primaryAngles: %d " % primaryAngles[i]) + angles[:,primaryAngles[i]-1] = fixedAngles[i] + + def getGeoAngles(self, scan, angleNames): + """ + This function returns all of the geometry angles for the + for the scan as a N-by-num_geo array, where N is the number of scan + points and num_geo is the number of geometry motors. + """ +# scan = self.sd[scanNo] + geoAngles = self.getScanAngles(scan, angleNames) + if not (self.instConfig.getSampleAngleMappingFunctionName() == ""): + tb = None + try: + self.fixGeoAngles(scan, geoAngles) + except Exception as ex: + tb = traceback.format_exc() + raise RSMap3DException("Handling exception in getGeoAngles." + \ + "\n" + \ + str(ex) + \ + "\n" + \ + str(tb)) + logger.debug("getGeoAngles:\n" + str(geoAngles)) + return geoAngles + + def getUBMatrix(self, scan): + """ + Read UB matrix from the #G3 line from the spec file. + """ + try: + g3 = scan.G["G3"].strip().split() + g3 = np.array(list(map(float, g3))) + ub = g3.reshape(-1,3) + logger.debug("ub " +str(ub)) + return ub + except: + logger.error("Unable to read UB Matrix from G3") + logger.error( '-'*60) + traceback.print_exc(file=sys.stdout) + logger.error('-'*60) + + + def hotpixelkill(self, areaData): + """ + function to remove hot pixels from CCD frames + ADD REMOVE VALUES IF NEEDED! + :param areaData: area detector data + """ + + for pixel in self.getBadPixels(): + badLoc = pixel.getBadLocation() + replaceLoc = pixel.getReplacementLocation() + areaData[badLoc[0],badLoc[1]] = \ + areaData[replaceLoc[0],replaceLoc[1]] + + return areaData + + def loadSource(self, mapHKL=False): + ''' + This method does the work of loading data from the files. This has been + split off from the constructor to allow this to be threaded and later + canceled. + :param mapHKL: boolean to mark if the data should be mapped to HKL + ''' + # Load up the instrument configuration file + self.loadInstrumentXMLConfig() + #Load up the detector configuration file + self.loadDetectorXMLConfig() + + self.specFile = os.path.join(self.projectDir, self.projectName + \ + self.projectExt) + imageDir = os.path.join(self.projectDir, \ + IMAGE_DIR_MERGE_STR % self.projectName) + self.imageFileTmp = os.path.join(imageDir, \ + TIFF_FILE_MERGE_STR % + (self.projectName)) + # if needed load up the bad pixel file. + if not (self.badPixelFile is None): + + badPixelFile = PilatusBadPixelFile(self.badPixelFile) + self.badPixels = badPixelFile.getBadPixels() + + # id needed load the flat field file + if not (self.flatFieldFile is None): + self.flatFieldData = np.array(Image.open(self.flatFieldFile)).T + # Load scan information from the spec file + + try: + self.sd = SpecDataFile(self.specFile) + self.mapHKL = mapHKL + + maxScan = int(self.sd.getMaxScanNumber()) + logger.debug("Number of Scans" + str(maxScan)) + if self.scans is None: + self.scans = range(1, maxScan+1) + imagePath = os.path.join(self.projectDir, + IMAGE_DIR_MERGE_STR % self.projectName) + + self.imageBounds = {} + self.imageToBeUsed = {} + self.availableScans = [] + self.incidentEnergy = {} + self.ubMatrix = {} + self.scanType = {} + self.progress = 0 + self.progressInc = 1 + # Zero the progress bar at the beginning. + if self.progressUpdater is not None: + self.progressUpdater(self.progress, self.progressMax) + + + if self.mpiRank == 0: + scanWinSize = SCAN_WIN_SIZE + else: + scanWinSize = 0 + + scanWin = MPI.Win.Allocate(scanWinSize, comm=self.mpiComm) + + scanIdx = self.mpiRank + if self.mpiRank == 0: + scanWin.Lock(rank=0) + scanWin.Put([self.mpiComm.size.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + self.mpiComm.Barrier() + + + while scanIdx < len(self.scans): + scan = self.scans[scanIdx] + if (self.cancelLoad): + self.cancelLoad = False + raise LoadCanceledException(CANCEL_STR) + + else: + if (os.path.exists(os.path.join(imagePath, \ + SCAN_NUMBER_MERGE_STR % scan))): + try: + curScan = self.sd.scans[str(scan)] + self.scanType[scan] = \ + self.sd.scans[str(scan)].scanCmd.split()[0] + angles = self.getGeoAngles(curScan, self.angleNames) + self.availableScans.append(scan) + if self.mapHKL==True: + self.ubMatrix[scan] = self.getUBMatrix(curScan) + if self.ubMatrix[scan] is None: + raise Sector33SpecFileException("UB matrix " + \ + "not found.") + else: + self.ubMatrix[scan] = None + self.incidentEnergy[scan] = 12398.4 /float(curScan.G['G4'].split()[3]) + _start_time = time.time() + self.imageBounds[scan] = \ + self.findImageQs(angles, \ + self.ubMatrix[scan], \ + self.incidentEnergy[scan]) + if self.progressUpdater is not None: + self.progressUpdater(self.progress, self.progressMax) + logger.info (('Elapsed time for Finding qs for scan %d: ' + + '%.3f seconds') % \ + (scan, (time.time() - _start_time))) + except ScanDataMissingException: + logger.error( "Scan " + str(scan) + " has no data") + #Make sure to show 100% completion + + scanBuff = bytearray(SCAN_WIN_SIZE) + scanWin.Lock(rank=0) + scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) + scanIdx = int.from_bytes(scanBuff, 'little') + print(f'Recieved Scan {scanIdx}') + scanNext = scanIdx + 1 + scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) + scanWin.Unlock(rank=0) + + if self.progressUpdater is not None: + self.progressUpdater(self.progressMax, self.progressMax) + except IOError: + raise IOError( "Cannot open file " + str(self.specFile)) + if len(self.getAvailableScans()) == 0: + raise ScanDataMissingException("Could not find scan data for " + \ + "input file \n" + self.specFile + \ + "\nOne possible reason for this " + \ + "is that the image files are " + \ + "missing. Images are assumed " + \ + "to be in " + \ + os.path.join(self.projectDir, + IMAGE_DIR_MERGE_STR % self.projectName)) + + scanData = self.exportScans() + scanData = self.mpiMergeSources(scanData) + + scanData = self.mpiComm.bcast(scanData, root=0) + self.importScans(scanData) + + self.availableScanTypes = set(self.scanType.values()) + + + def mpiMergeSources(self, scanData): + """ + Merges data sources, combining their information till all data is collated at proc 0. + Similar to the upward execution of a distributed merge-sort. + Data can be shared back via the export commands. + """ + worldSize = self.mpiComm.Get_size() + + depth = math.floor(np.log2(worldSize)) + 1 + for currDepth in range(depth): + + # Exclude procs that have merged + if self.mpiRank % (2**currDepth) != 0: + break + + # Determine if sending or receiving + if (self.mpiRank / (2**currDepth)) % 2 == 0: + source = self.mpiRank + (2**currDepth) + + # Odd # world size + if source >= worldSize: + continue + + incomingScanData = self.mpiComm.recv(source=source) + for key in incomingScanData.keys(): + if type(incomingScanData[key]) is list: + scanData[key] += incomingScanData[key] + elif type(incomingScanData[key]) is dict: + if len(set(scanData[key].keys()).intersection(set(incomingScanData[key].keys()))) > 0: + raise ValueError(f"Unable to merge {key} dict due to overlapping keys") + scanData[key].update(incomingScanData[key]) + else: + raise NotImplementedError(f"Unable to merge {key}. Please implement merge here.") + + else: + dest = self.mpiRank - (2 ** currDepth) + self.mpiComm.send(scanData, dest=dest) + + return scanData + + + def exportScans(self): + """ + Returns loaded data as an MPI-safe object + """ + return { + 'imageBounds': self.imageBounds, + 'imageToBeUsed': self.imageToBeUsed, + 'availableScans':self.availableScans, + 'incidentEnergy': self.incidentEnergy, + 'ubMatrix':self.ubMatrix, + 'availableScanTypes': self.availableScanTypes + } + + def importScans(self, scanData): + """ + Imports exported scan data from another data source via an MPI-safe object. + """ + self.imageBounds = scanData['imageBounds'] + self.imageToBeUsed = scanData['imageToBeUsed'] + self.availableScans = scanData['availableScans'] + self.incidentEnergy = scanData['incidentEnergy'] + self.ubMatrix = scanData['ubMatrix'] + self.availableScanTypes = scanData['availableScanTypes'] + + def to_bool(self, value): + """ + Note this method found in answer to: + http://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python + Converts 'something' to boolean. Raises exception if it gets a string + it doesn't handle. + Case is ignored for strings. These string values are handled: + True: 'True', "1", "TRue", "yes", "y", "t" + False: "", "0", "faLse", "no", "n", "f" + Non-string values are passed to bool. + """ + if type(value) == type(''): + if value.lower() in ("yes", "y", "true", "t", "1"): + return True + if value.lower() in ("no", "n", "false", "f", "0", ""): + return False + raise Exception('Invalid value for boolean conversion: ' + value) + return bool(value) + + def rawmap(self,scans, angdelta=[0,0,0,0,0], + adframes=None, mask = None): + """ + read ad frames and and convert them in reciprocal space + angular coordinates are taken from the spec file + or read from the edf file header when no scan number is given (scannr=None) + """ + + if mask is None: + mask_was_none = True + #mask = [True] * len(self.getImageToBeUsed()[scans[0]]) + else: + mask_was_none = False + #sd = spec.SpecDataFile(self.specFile) + intensity = np.array([]) + + # fourc goniometer in fourc coordinates + # convention for coordinate system: + # x: upwards; + # y: along the incident beam; + # z: "outboard" (makes coordinate system right-handed). + # QConversion will set up the goniometer geometry. + # So the first argument describes the sample rotations, the second the + # detector rotations and the third the primary beam direction. + qconv = xu.experiment.QConversion(self.getSampleCircleDirections(), \ + self.getDetectorCircleDirections(), \ + self.getPrimaryBeamDirection()) + + # define experimental class for angle conversion + # + # ipdir: inplane reference direction (ipdir points into the primary beam + # direction at zero angles) + # ndir: surface normal of your sample (ndir points in a direction + # perpendicular to the primary beam and the innermost detector + # rotation axis) + en = self.getIncidentEnergy() + hxrd = xu.HXRD(self.getInplaneReferenceDirection(), \ + self.getSampleSurfaceNormalDirection(), \ + en=en[self.getAvailableScans()[0]], \ + qconv=qconv) + + + # initialize area detector properties + if (self.getDetectorPixelWidth() != None ) and \ + (self.getDistanceToDetector() != None): + hxrd.Ang2Q.init_area(self.getDetectorPixelDirection1(), \ + self.getDetectorPixelDirection2(), \ + cch1=self.getDetectorCenterChannel()[0], \ + cch2=self.getDetectorCenterChannel()[1], \ + Nch1=self.getDetectorDimensions()[0], \ + Nch2=self.getDetectorDimensions()[1], \ + pwidth1=self.getDetectorPixelWidth()[0], \ + pwidth2=self.getDetectorPixelWidth()[1], \ + distance=self.getDistanceToDetector(), \ + Nav=self.getNumPixelsToAverage(), \ + roi=self.getDetectorROI()) + else: + hxrd.Ang2Q.init_area(self.getDetectorPixelDirection1(), \ + self.getDetectorPixelDirection2(), \ + cch1=self.getDetectorCenterChannel()[0], \ + cch2=self.getDetectorCenterChannel()[1], \ + Nch1=self.getDetectorDimensions()[0], \ + Nch2=self.getDetectorDimensions()[1], \ + chpdeg1=self.getDetectorChannelsPerDegree()[0], \ + chpdeg2=self.getDetectorChannelsPerDegree()[1], \ + Nav=self.getNumPixelsToAverage(), + roi=self.getDetectorROI()) + + angleNames = self.getAngles() + scanAngle = {} + for i in range(len(angleNames)): + scanAngle[i] = np.array([]) + + offset = 0 + imageToBeUsed = self.getImageToBeUsed() + monitorName = self.getMonitorName() + monitorScaleFactor = self.getMonitorScaleFactor() + filterName = self.getFilterName() + filterScaleFactor = self.getFilterScaleFactor() + for scannr in scans: + if self.haltMap: + raise ProcessCanceledException("Process Canceled") + scan = self.sd.scans[str(scannr)] + angles = self.getGeoAngles(scan, angleNames) + scanAngle1 = {} + scanAngle2 = {} + for i in range(len(angleNames)): + scanAngle1[i] = angles[:,i] + scanAngle2[i] = [] + if monitorName != None: + monitor_data = scan.data.get(monitorName) + if monitor_data is None: + raise IOError("Did not find Monitor source '" + \ + monitorName + \ + "' in the Spec file. Make sure " + \ + "monitorName is correct in the " + \ + "instrument Config file") + if filterName != None: + filter_data = scan.data.get(filterName) + if filter_data is None: + raise IOError("Did not find filter source '" + \ + filterName + \ + "' in the Spec file. Make sure " + \ + "filterName is correct in the " + \ + "instrument Config file") + # read in the image data + arrayInitializedForScan = False + foundIndex = 0 + + if mask_was_none: + mask = [True] * len(self.getImageToBeUsed()[scannr]) + + for ind in range(len(scan.data[list(scan.data.keys())[0]])): + if imageToBeUsed[scannr][ind] and mask[ind]: + # read tif image + im = Image.open(self.imageFileTmp % (scannr, scannr, ind)) + img = np.array(im.getdata()).reshape(im.size[1],im.size[0]).T + img = self.hotpixelkill(img) + ff_data = self.getFlatFieldData() + if not (ff_data is None): + img = img * ff_data + # reduce data siz + img2 = xu.blockAverage2D(img, + self.getNumPixelsToAverage()[0], \ + self.getNumPixelsToAverage()[1], \ + roi=self.getDetectorROI()) + + # apply intensity corrections + if monitorName != None: + img2 = img2 / monitor_data[ind] * monitorScaleFactor + if filterName != None: + img2 = img2 / filter_data[ind] * filterScaleFactor + + # initialize data array + if not arrayInitializedForScan: + imagesToProcess = [imageToBeUsed[scannr][i] and mask[i] for i in range(len(imageToBeUsed[scannr]))] + if not intensity.shape[0]: + intensity = np.zeros((np.count_nonzero(imagesToProcess),) + img2.shape) + arrayInitializedForScan = True + else: + offset = intensity.shape[0] + intensity = np.concatenate( + (intensity, + (np.zeros((np.count_nonzero(imagesToProcess),) + img2.shape))), + axis=0) + arrayInitializedForScan = True + # add data to intensity array + intensity[foundIndex+offset,:,:] = img2 + for i in range(len(angleNames)): +# logger.debug("appending angles to angle2 " + +# str(scanAngle1[i][ind])) + scanAngle2[i].append(scanAngle1[i][ind]) + foundIndex += 1 + if len(scanAngle2[0]) > 0: + for i in range(len(angleNames)): + scanAngle[i] = \ + np.concatenate((scanAngle[i], np.array(scanAngle2[i])), \ + axis=0) + # transform scan angles to reciprocal space coordinates for all detector pixels + angleList = [] + for i in range(len(angleNames)): + angleList.append(scanAngle[i]) + if self.ubMatrix[scans[0]] is None: + qx, qy, qz = hxrd.Ang2Q.area(*angleList, \ + roi=self.getDetectorROI(), + Nav=self.getNumPixelsToAverage()) + else: + qx, qy, qz = hxrd.Ang2Q.area(*angleList, \ + roi=self.getDetectorROI(), + Nav=self.getNumPixelsToAverage(), \ + UB = self.ubMatrix[scans[0]]) + + + # apply selected transform + qxTrans, qyTrans, qzTrans = \ + self.transform.do3DTransform(qx, qy, qz) + + + return qxTrans, qyTrans, qzTrans, intensity + + +class LoadCanceledException(RSMap3DException): + ''' + Exception Thrown when loading data is canceled. + ''' + def __init__(self, message): + super(LoadCanceledException, self).__init__(message) + +class Sector33SpecFileException(RSMap3DException): + ''' + Exception class to be raised if there is a problem loading information + from a spec file + file + ''' + def __init__(self, message): + super(Sector33SpecFileException, self).__init__(message) + + + From 271915f731aa450e4c7e77cd2cfe5f64c88788c0 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Thu, 15 Sep 2022 15:38:12 -0500 Subject: [PATCH 12/20] Change back to time.log output --- Scripts/mpiMapSpecAngleScan.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index 7ef70b8..f155379 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -20,13 +20,6 @@ from rsMap3D.gui.rsm3dcommonstrings import BINARY_OUTPUT from rsMap3D.transforms.unitytransform3d import UnityTransform3D from rsMap3D.mappers.output.vtigridwriter import VTIGridWriter -import logging - -logging.basicConfig(format='%(asctime)s %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p', - filename='mpiMap.log', - level=logging.INFO) - from mpi4py import MPI from mpi4py.futures import MPICommExecutor @@ -46,7 +39,8 @@ def updateMapperProgress(value1): if mpiRank == 0: startTime = datetime.datetime.now() - logging.info('Starting Mapping') + with open('time.log', 'a') as time_log: + time_log.write(f'Start: {startTime}\n') # @@ -114,6 +108,9 @@ def updateMapperProgress(value1): ds.loadSource(mapHKL=mapHKL) +if mpiRank == 0: + with open('time.log', 'a') as time_log: + time_log.write(f'Source Load Time: {datetime.datetime.now()}\n') ds.setRangeBounds(ds.getOverallRanges()) imageToBeUsed = ds.getImageToBeUsed() @@ -133,6 +130,7 @@ def updateMapperProgress(value1): gridMapper.doMap() if mpiRank == 0: - logging.info('Ended map') - endTime = datetime.datetime.now() - logging.info(f'Diff: {endTime - startTime}\n') \ No newline at end of file + with open('time.log', 'a') as time_log: + endTime = datetime.datetime.now() + time_log.write(f'End: {endTime}\n') + time_log.write(f'Diff: {endTime - startTime}\n') \ No newline at end of file From 5741b24ba5a1f2bef213d1dff6a22a97ca1cae9d Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Fri, 16 Sep 2022 10:48:15 -0500 Subject: [PATCH 13/20] Refactor printing to display proc output --- Scripts/mpiMapSpecAngleScan.py | 2 +- .../datasource/MpiSector33SpecDataSource.py | 12 ++++------ rsMap3D/mappers/mpigridmapper.py | 22 +++++-------------- 3 files changed, 10 insertions(+), 26 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index f155379..49da31f 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -104,7 +104,7 @@ def updateMapperProgress(value1): pixelsToAverage=bin, scanList= scanRange, appConfig=appConfig) ds.setCurrentDetector(detectorName) -ds.setProgressUpdater(updateDataSourceProgress) +#ds.setProgressUpdater(updateDataSourceProgress) ds.loadSource(mapHKL=mapHKL) diff --git a/rsMap3D/datasource/MpiSector33SpecDataSource.py b/rsMap3D/datasource/MpiSector33SpecDataSource.py index 60d8bd9..33ab779 100644 --- a/rsMap3D/datasource/MpiSector33SpecDataSource.py +++ b/rsMap3D/datasource/MpiSector33SpecDataSource.py @@ -276,9 +276,6 @@ def loadSource(self, mapHKL=False): self.scanType = {} self.progress = 0 self.progressInc = 1 - # Zero the progress bar at the beginning. - if self.progressUpdater is not None: - self.progressUpdater(self.progress, self.progressMax) if self.mpiRank == 0: @@ -325,8 +322,6 @@ def loadSource(self, mapHKL=False): self.findImageQs(angles, \ self.ubMatrix[scan], \ self.incidentEnergy[scan]) - if self.progressUpdater is not None: - self.progressUpdater(self.progress, self.progressMax) logger.info (('Elapsed time for Finding qs for scan %d: ' + '%.3f seconds') % \ (scan, (time.time() - _start_time))) @@ -338,13 +333,14 @@ def loadSource(self, mapHKL=False): scanWin.Lock(rank=0) scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) scanIdx = int.from_bytes(scanBuff, 'little') - print(f'Recieved Scan {scanIdx}') + if scanIdx < len(self.scans): + print(f'Proc {self.mpiRank} Loading Scan {scanIdx+1}/{len(self.scans)}') + else: + print(f'Proc {self.mpiRank} finished load. Beginning merge.') scanNext = scanIdx + 1 scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) scanWin.Unlock(rank=0) - if self.progressUpdater is not None: - self.progressUpdater(self.progressMax, self.progressMax) except IOError: raise IOError( "Cannot open file " + str(self.specFile)) if len(self.getAvailableScans()) == 0: diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 828207a..318f6b2 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -81,7 +81,6 @@ def processMap(self, **kwargs): True) imageToBeUsed = self.dataSource.getImageToBeUsed() - progress = 0 scans = self.dataSource.getAvailableScans() @@ -102,7 +101,6 @@ def processMap(self, **kwargs): self.mpiComm.Barrier() while scanIdx < len(scans): - print(f'Starting Scan {scanIdx} of {len(scans)}') scan = scans[scanIdx] if True in imageToBeUsed[scan]: @@ -116,9 +114,6 @@ def processMap(self, **kwargs): try: gridder(qx, qy, qz, intensity) - progress += 100 - if self.progressUpdater is not None: - self.progressUpdater(progress) except InputError as ex: print ("Wrong Input to gridder") @@ -129,9 +124,6 @@ def processMap(self, **kwargs): raise InputError(ex) - progress += 100 - if self.progressUpdater is not None: - self.progressUpdater(progress) else: nPasses = int(imageSize*4*numImages/ maxImageMem + 1) for thisPass in range(nPasses): @@ -148,9 +140,6 @@ def processMap(self, **kwargs): gridder(qx, qy, qz, intensity) - progress += 1.0/nPasses* 100.0 - if self.progressUpdater is not None: - self.progressUpdater(progress) except InputError as ex: print ("Wrong Input to gridder") print ("qx Size: " + str( qx.shape)) @@ -158,22 +147,21 @@ def processMap(self, **kwargs): print ("qz Size: " + str( qz.shape)) print ("intensity Size: " + str(intensity.shape)) raise InputError(ex) - else: - progress += 1.0/nPasses* 100.0 - if self.progressUpdater is not None: - self.progressUpdater(progress) scanBuff = bytearray(SCAN_WIN_SIZE) scanWin.Lock(rank=0) scanWin.Get([scanBuff, MPI.BYTE], target_rank=0) scanIdx = int.from_bytes(scanBuff, 'little') - print(f'Recieved Scan {scanIdx}') + if scanIdx < len(scans): + print(f'Proc {self.mpiRank} Gridding {scanIdx+1}/{len(scans)}') + else: + print(f'Proc {self.mpiRank} finished grid. Beginning merge.') + scanNext = scanIdx + 1 scanWin.Put([scanNext.to_bytes(SCAN_WIN_SIZE, 'little'), MPI.BYTE], target_rank=0) scanWin.Unlock(rank=0) - self.progressUpdater(100.0) gridder = self.mergeGridders(gridder) From dddb7a7b47dbbb66be1cba7ff0b4b2687d2f330a Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Fri, 23 Sep 2022 08:54:54 -0500 Subject: [PATCH 14/20] Add beginning gridding print --- rsMap3D/mappers/mpigridmapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsMap3D/mappers/mpigridmapper.py b/rsMap3D/mappers/mpigridmapper.py index 318f6b2..0ea7483 100644 --- a/rsMap3D/mappers/mpigridmapper.py +++ b/rsMap3D/mappers/mpigridmapper.py @@ -99,7 +99,7 @@ def processMap(self, **kwargs): scanWin.Unlock(rank=0) self.mpiComm.Barrier() - + print(f'Proc {self.mpiRank} Beginning Gridding {scanIdx+1}/{len(scans)}') while scanIdx < len(scans): scan = scans[scanIdx] From 251ff4f4c939034e9676a936c0583c2c45a012b1 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 28 Sep 2022 09:23:30 -0500 Subject: [PATCH 15/20] Add proc safety and argparse to mpi file --- Scripts/mpiMapSpecAngleScan.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/Scripts/mpiMapSpecAngleScan.py b/Scripts/mpiMapSpecAngleScan.py index 49da31f..5222f10 100644 --- a/Scripts/mpiMapSpecAngleScan.py +++ b/Scripts/mpiMapSpecAngleScan.py @@ -8,9 +8,10 @@ import os import numpy as np -import xrayutilities as xu import json import datetime +import argparse + from rsMap3D.datasource.MpiSector33SpecDataSource import MPISector33SpecDataSource from rsMap3D.datasource.DetectorGeometryForXrayutilitiesReader import DetectorGeometryForXrayutilitiesReader as detReader from rsMap3D.utils.srange import srange @@ -27,13 +28,23 @@ mpiComm = MPI.COMM_WORLD mpiRank = mpiComm.Get_rank() +def parseArgs(): + parser = argparse.ArgumentParser(description='Default MPI run script. Expects a json config file pointed at the data.') + parser.add_argument('configPath', + nargs='?', + default=os.path.join(os.getcwd(), 'config.json'), + help='Path to config file. If none supplied, directs to a config.json located in CWD') + return parser.parse_args() + +args = parseArgs() + def updateDataSourceProgress(value1, value2): print("DataSource Progress %s/%s" % (value1, value2)) def updateMapperProgress(value1): print("Mapper Progress %s" % (value1)) -with open('config.json', 'r') as config_f: +with open(args.configPath, 'r') as config_f: config = json.load(config_f) @@ -51,6 +62,9 @@ def updateMapperProgress(value1): specFile = config["spec_file"] scanRange = srange(config["scan_range"]).list() +if len(scanRange) < mpiComm.Get_size(): + raise ValueError("Too many processes! Reduce proc count to <= number of scans.") + # From 8745a783da336350f14a631c84d16893581c6907 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 28 Sep 2022 10:42:59 -0500 Subject: [PATCH 16/20] Add MPI info docs --- docs/MPI_Info.md | 116 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 docs/MPI_Info.md diff --git a/docs/MPI_Info.md b/docs/MPI_Info.md new file mode 100644 index 0000000..a299b2d --- /dev/null +++ b/docs/MPI_Info.md @@ -0,0 +1,116 @@ +# Overview + +This document covers the topics related to usage of the MPI and parallelized components of RSMap3d. + +# Installation + +NOTE: Requires MPI to have Remote Memory Access capabilities. Highly recommended that installations implement MPI >= 3.0. + +## Recommended Installation Through Conda + +1. Create a default environment with python=3.6 `conda create -y -n rsMapMPI python=3.6` +2. Activate Environment: `conda activate rsMapMPI` +3. Install MPI and MPI4py with conda: `conda install -y -c conda-forge mpi4py openmpi` +4. Install requirements with pip: `pip install PyQt5==5.15.6 xrayutilities==1.7.3 spec2nexus==2021.1.11 vtk==9.1.0` +5. Install rsMap from root dir: `pip install .` + +# Usage + +MPI is implemented through a similar interface to the standard RSMap library files. Due to MPIRun launching many processes in parallel, the GUI is not supported for MPI. A script has been provded as a starting point. + +## Running the Program + +The program can be run via the local mpi run command. In the default installation this is `mpirun -n [num procs] python [script].py`. + +On ALCF systems, this would be run with `aprun -n [num procs] -N [num procs per node] python [script].py` + +## Script Usage + +A script `Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: `mpiMapSpecAngleScan.py path/to/config.json`. If no path is provided, the script searches for `config.json` in the CWD. + +The config format is as follows: +```json +{ + "scan_range": "[int]-[int]", + "project_dir":"path/to/project", + "spec_file":"spec_file (NOTE: relative to project dir)", + "detector_config":"dtc_file (NOTE: relative to project dir)", + "instrument_config":"inst_file (NOTE: relative to project dir)" +} + +``` + + +## Class API Changes + +New MPI classes based on original classes have been implemented. There are a few sharp edges due to synchronization. They are located in files with the `mpi` prefix. The constructors of MPI classes will require an additional argument: the `mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. + +```python +from mpi4py import MPI + +mpiComm = MPI.COMM_WORLD +mpiRank = mpiComm.Get_rank() + +if mpiRank == 0: # 0th process only + # Do singleton logging, work, etc. +``` + +Operations like `datasource.loadSource` and `gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: + +```python +if mpiRank == 2: + time.sleep(10) + +gridMapper.doMap() # All process will be forced to wait till process 2 enters this function in 10 seconds + +if mpiRank != 2: # This will cause the program to hang indefinitely + gridMapper.doMap() +``` + + + + +# Sharp Edges + +A major sharp edge is `num scans <= num cores`. If a run only uses 10 scan, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The `Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. + +Parallelization also introduces sharp edges related to the gridder and loading of data. + +## Gridder + +Some sharp edges appear around the gridder object. + +Two gridder settings are required for this form of parallelization to work: + +1. Normalization must be off. This is the default in xrayutilities==1.7.3 +2. The gridder must be a static size. This is hard-coded to false in the mpigridmapper.py file. + +The gridder is passed through MPI channels `nlog(n)` times (where n = num procs) during the program. An extremely large gridder size may increase runtimes. + +## Loading + +Loading of data into the datasource is parallelized. As such, a datasource merging operation is required. This is implemented via the following block and functions in `MpiSector33SpecDataSource.py`: + +```python +scanData = self.exportScans() +scanData = self.mpiMergeSources(scanData) + +scanData = self.mpiComm.bcast(scanData, root=0) +self.importScans(scanData) +``` + +If your implementation requires additional data to be loaded, `exportScans` and `importScans` will need to be modified. If the data is not list or dict based, `mpiMergeSources` will likely need modification. Custom merge operations can be added to the type conditional in the merge method. Any new data being passed through MPI must be able to be pickled. + +# Performance Characteristics + +This section will cover expected program performance and recommendations for optimizing runs. + +## Performance Factors + +The program adds a `nlog(n)` operation where `n = num procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. + +Otherwise, gridding is expected to scale with a factor of `1/(2^n)`. Loading is expected to scale roughly the same, but may be limited by disk access speeds. + +## Recommended Settings + +The gridding operation is additionally multi-threaded via xrayutilities so it is recommended that num procs < num cores. Through testing we recommend `num_procs = num_cores / [5-10]` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file From 90f45597b38f5cdb515945c4e7f601dab1ebb112 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 28 Sep 2022 12:40:08 -0500 Subject: [PATCH 17/20] Apply edits to MPI documentation --- docs/MPI_Info.md | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/docs/MPI_Info.md b/docs/MPI_Info.md index a299b2d..532b319 100644 --- a/docs/MPI_Info.md +++ b/docs/MPI_Info.md @@ -20,13 +20,13 @@ MPI is implemented through a similar interface to the standard RSMap library fil ## Running the Program -The program can be run via the local mpi run command. In the default installation this is `mpirun -n [num procs] python [script].py`. +The program can be run via the local mpi run command. In the default installation this is `mpirun -n [num_procs] python [script].py` -On ALCF systems, this would be run with `aprun -n [num procs] -N [num procs per node] python [script].py` +On ALCF systems, this would be run with `aprun -n [num_procs] -N [num_procs per node] python [script].py` ## Script Usage -A script `Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: `mpiMapSpecAngleScan.py path/to/config.json`. If no path is provided, the script searches for `config.json` in the CWD. +A script `Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: `mpiMapSpecAngleScan.py path/to/config.json` If no path is provided, the script searches for `config.json` in the CWD. The config format is as follows: ```json @@ -43,7 +43,7 @@ The config format is as follows: ## Class API Changes -New MPI classes based on original classes have been implemented. There are a few sharp edges due to synchronization. They are located in files with the `mpi` prefix. The constructors of MPI classes will require an additional argument: the `mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. +MPI is implemented in new classes based on the original classes.Due to sharp edges, they are located in source files with the `mpi` prefix. The constructors of MPI classes will require an additional argument: the `mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. ```python from mpi4py import MPI @@ -55,7 +55,7 @@ if mpiRank == 0: # 0th process only # Do singleton logging, work, etc. ``` -Operations like `datasource.loadSource` and `gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: +Operations like `dataSource.loadSource()` and `gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: ```python if mpiRank == 2: @@ -78,14 +78,12 @@ Parallelization also introduces sharp edges related to the gridder and loading o ## Gridder -Some sharp edges appear around the gridder object. - Two gridder settings are required for this form of parallelization to work: -1. Normalization must be off. This is the default in xrayutilities==1.7.3 -2. The gridder must be a static size. This is hard-coded to false in the mpigridmapper.py file. +1. Normalization must be off. This is the default in xrayutilities==1.7.3. +2. The gridder must be a static size. This is hard-coded in the mpigridmapper.py file. -The gridder is passed through MPI channels `nlog(n)` times (where n = num procs) during the program. An extremely large gridder size may increase runtimes. +The gridder is passed through MPI channels `nlog(n)` times (where n = num_procs) during the program. An extremely large gridder size may increase runtimes. ## Loading @@ -99,7 +97,7 @@ scanData = self.mpiComm.bcast(scanData, root=0) self.importScans(scanData) ``` -If your implementation requires additional data to be loaded, `exportScans` and `importScans` will need to be modified. If the data is not list or dict based, `mpiMergeSources` will likely need modification. Custom merge operations can be added to the type conditional in the merge method. Any new data being passed through MPI must be able to be pickled. +If your implementation requires additional data to be loaded, `exportScans` and `importScans` will need to be modified. If the data is not list or dict based, `mpiMergeSources` will need modification. Custom merge operations can be added to the conditional in the merge method. Any new data being passed through MPI must be able to be pickled. # Performance Characteristics @@ -107,10 +105,10 @@ This section will cover expected program performance and recommendations for opt ## Performance Factors -The program adds a `nlog(n)` operation where `n = num procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. +The program adds a `nlog(n)` operation where `n = num_procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. -Otherwise, gridding is expected to scale with a factor of `1/(2^n)`. Loading is expected to scale roughly the same, but may be limited by disk access speeds. +Otherwise, gridding is expected to scale with a factor of `1/(2^n)`. Loading is expected to scale the same, but may be limited by disk access speeds. ## Recommended Settings -The gridding operation is additionally multi-threaded via xrayutilities so it is recommended that num procs < num cores. Through testing we recommend `num_procs = num_cores / [5-10]` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file +The gridding operation is additionally multi-threaded via xrayutilities so it is recommended that num_procs < num cores. Through testing we recommend `num_procs = num_cores / [5-10]` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file From e5c41f0b9cce5d2419b97f2016cab546b22b3c99 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Wed, 28 Sep 2022 13:43:56 -0500 Subject: [PATCH 18/20] More doc fixes --- docs/MPI_Info.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/MPI_Info.md b/docs/MPI_Info.md index 532b319..987cedf 100644 --- a/docs/MPI_Info.md +++ b/docs/MPI_Info.md @@ -72,7 +72,7 @@ if mpiRank != 2: # This will cause the program to hang indefinitely # Sharp Edges -A major sharp edge is `num scans <= num cores`. If a run only uses 10 scan, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The `Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. +A major sharp edge is `num scans >= num cores`. If a run only uses 10 scans, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The `Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. Parallelization also introduces sharp edges related to the gridder and loading of data. @@ -81,7 +81,7 @@ Parallelization also introduces sharp edges related to the gridder and loading o Two gridder settings are required for this form of parallelization to work: 1. Normalization must be off. This is the default in xrayutilities==1.7.3. -2. The gridder must be a static size. This is hard-coded in the mpigridmapper.py file. +2. The gridder must be a static size. This is hard-coded false in the mpigridmapper.py file. The gridder is passed through MPI channels `nlog(n)` times (where n = num_procs) during the program. An extremely large gridder size may increase runtimes. From 716272edea38e8919229f59806a502210b356687 Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Mon, 3 Oct 2022 13:28:53 -0500 Subject: [PATCH 19/20] Modify MPI recommendation based on multithreading experiments --- docs/MPI_Info.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/MPI_Info.md b/docs/MPI_Info.md index 987cedf..7fbfc01 100644 --- a/docs/MPI_Info.md +++ b/docs/MPI_Info.md @@ -111,4 +111,4 @@ Otherwise, gridding is expected to scale with a factor of `1/(2^n)`. Loading is ## Recommended Settings -The gridding operation is additionally multi-threaded via xrayutilities so it is recommended that num_procs < num cores. Through testing we recommend `num_procs = num_cores / [5-10]` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file +While multi-threading is applied by xrayutilities, it only affects about <5% of execution time. As such, the main parameter to vary is the number of processes. Through testing we recommend `num_procs = num MPI slots` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file From 82625e31396d4394a5bd090479cc126a63d3dd8e Mon Sep 17 00:00:00 2001 From: linked-liszt Date: Mon, 3 Oct 2022 15:55:02 -0500 Subject: [PATCH 20/20] Add .rst file to sphinx docs --- docs/MPI_Info.md | 114 ------------------- rsMap3D/docs/Tutorial/MPIAcceleration.rst | 129 ++++++++++++++++++++++ 2 files changed, 129 insertions(+), 114 deletions(-) delete mode 100644 docs/MPI_Info.md create mode 100644 rsMap3D/docs/Tutorial/MPIAcceleration.rst diff --git a/docs/MPI_Info.md b/docs/MPI_Info.md deleted file mode 100644 index 7fbfc01..0000000 --- a/docs/MPI_Info.md +++ /dev/null @@ -1,114 +0,0 @@ -# Overview - -This document covers the topics related to usage of the MPI and parallelized components of RSMap3d. - -# Installation - -NOTE: Requires MPI to have Remote Memory Access capabilities. Highly recommended that installations implement MPI >= 3.0. - -## Recommended Installation Through Conda - -1. Create a default environment with python=3.6 `conda create -y -n rsMapMPI python=3.6` -2. Activate Environment: `conda activate rsMapMPI` -3. Install MPI and MPI4py with conda: `conda install -y -c conda-forge mpi4py openmpi` -4. Install requirements with pip: `pip install PyQt5==5.15.6 xrayutilities==1.7.3 spec2nexus==2021.1.11 vtk==9.1.0` -5. Install rsMap from root dir: `pip install .` - -# Usage - -MPI is implemented through a similar interface to the standard RSMap library files. Due to MPIRun launching many processes in parallel, the GUI is not supported for MPI. A script has been provded as a starting point. - -## Running the Program - -The program can be run via the local mpi run command. In the default installation this is `mpirun -n [num_procs] python [script].py` - -On ALCF systems, this would be run with `aprun -n [num_procs] -N [num_procs per node] python [script].py` - -## Script Usage - -A script `Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: `mpiMapSpecAngleScan.py path/to/config.json` If no path is provided, the script searches for `config.json` in the CWD. - -The config format is as follows: -```json -{ - "scan_range": "[int]-[int]", - "project_dir":"path/to/project", - "spec_file":"spec_file (NOTE: relative to project dir)", - "detector_config":"dtc_file (NOTE: relative to project dir)", - "instrument_config":"inst_file (NOTE: relative to project dir)" -} - -``` - - -## Class API Changes - -MPI is implemented in new classes based on the original classes.Due to sharp edges, they are located in source files with the `mpi` prefix. The constructors of MPI classes will require an additional argument: the `mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. - -```python -from mpi4py import MPI - -mpiComm = MPI.COMM_WORLD -mpiRank = mpiComm.Get_rank() - -if mpiRank == 0: # 0th process only - # Do singleton logging, work, etc. -``` - -Operations like `dataSource.loadSource()` and `gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: - -```python -if mpiRank == 2: - time.sleep(10) - -gridMapper.doMap() # All process will be forced to wait till process 2 enters this function in 10 seconds - -if mpiRank != 2: # This will cause the program to hang indefinitely - gridMapper.doMap() -``` - - - - -# Sharp Edges - -A major sharp edge is `num scans >= num cores`. If a run only uses 10 scans, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The `Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. - -Parallelization also introduces sharp edges related to the gridder and loading of data. - -## Gridder - -Two gridder settings are required for this form of parallelization to work: - -1. Normalization must be off. This is the default in xrayutilities==1.7.3. -2. The gridder must be a static size. This is hard-coded false in the mpigridmapper.py file. - -The gridder is passed through MPI channels `nlog(n)` times (where n = num_procs) during the program. An extremely large gridder size may increase runtimes. - -## Loading - -Loading of data into the datasource is parallelized. As such, a datasource merging operation is required. This is implemented via the following block and functions in `MpiSector33SpecDataSource.py`: - -```python -scanData = self.exportScans() -scanData = self.mpiMergeSources(scanData) - -scanData = self.mpiComm.bcast(scanData, root=0) -self.importScans(scanData) -``` - -If your implementation requires additional data to be loaded, `exportScans` and `importScans` will need to be modified. If the data is not list or dict based, `mpiMergeSources` will need modification. Custom merge operations can be added to the conditional in the merge method. Any new data being passed through MPI must be able to be pickled. - -# Performance Characteristics - -This section will cover expected program performance and recommendations for optimizing runs. - -## Performance Factors - -The program adds a `nlog(n)` operation where `n = num_procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. - -Otherwise, gridding is expected to scale with a factor of `1/(2^n)`. Loading is expected to scale the same, but may be limited by disk access speeds. - -## Recommended Settings - -While multi-threading is applied by xrayutilities, it only affects about <5% of execution time. As such, the main parameter to vary is the number of processes. Through testing we recommend `num_procs = num MPI slots` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file diff --git a/rsMap3D/docs/Tutorial/MPIAcceleration.rst b/rsMap3D/docs/Tutorial/MPIAcceleration.rst new file mode 100644 index 0000000..4b59d2a --- /dev/null +++ b/rsMap3D/docs/Tutorial/MPIAcceleration.rst @@ -0,0 +1,129 @@ +MPI Acceleration Info +===================== + +This document covers the topics related to usage of the MPI and parallelized components of RSMap3d. + +Installation +------------ + +NOTE: Requires MPI to have Remote Memory Access capabilities. Highly recommended that installations implement MPI >= 3.0. + +Recommended Installation Through Conda +`````````````````````````````````````` + +1. Create a default environment with python=3.6 :code:`conda create -y -n rsMapMPI python=3.6` +2. Activate Environment: :code:`conda activate rsMapMPI` +3. Install MPI and MPI4py with conda: :code:`conda install -y -c conda-forge mpi4py openmpi` +4. Install requirements with pip: :code:`pip install PyQt5==5.15.6 xrayutilities==1.7.3 spec2nexus==2021.1.11 vtk==9.1.0` +5. Install rsMap from root dir: :code:`pip install .` + +Usage +----- + +MPI is implemented through a similar interface to the standard RSMap library files. Due to MPIRun launching many processes in parallel, the GUI is not supported for MPI. A script has been provded as a starting point. + +Running the Program +``````````````````` + +The program can be run via the local mpi run command. In the default installation this is :code:`mpirun -n [num_procs] python [script].py` + +On ALCF systems, this would be run with :code:`aprun -n [num_procs] -N [num_procs per node] python [script].py` + +Script Usage +```````````` + +A script :code:`Scripts/mpiMapSpecAngleScan.py` has been provided as a starting point. The script takes in a .json config file pointing to data files. The path can be provided as a command line argument: :code:`mpiMapSpecAngleScan.py path/to/config.json` If no path is provided, the script searches for :code:`config.json` in the CWD. + +The config format is as follows: + +.. code-block:: json + + { + "scan_range": "[int]-[int]", + "project_dir":"path/to/project", + "spec_file":"spec_file (NOTE: relative to project dir)", + "detector_config":"dtc_file (NOTE: relative to project dir)", + "instrument_config":"inst_file (NOTE: relative to project dir)" + } + + + +Class API Changes +````````````````` + +MPI is implemented in new classes based on the original classes.Due to sharp edges, they are located in source files with the :code:`mpi` prefix. The constructors of MPI classes will require an additional argument: the :code:`mpiComm`. When creating your own scripts, this can be created via the following block of code. It is also highly recommended to use the process rank to prevent the program from repeating script work. + +.. code-block:: python + + from mpi4py import MPI + + mpiComm = MPI.COMM_WORLD + mpiRank = mpiComm.Get_rank() + + if mpiRank == 0: # 0th process only + # Do singleton logging, work, etc. + +Operations like :code:`dataSource.loadSource()` and :code:`gridMapper.doMap()` are now **SYNCHRONOUS**. This means ALL scripts have to enter the same branch for them to complete. The program will _hang_ (not exit) at synchronization points. As an example: + +.. code-block:: python + + if mpiRank == 2: + time.sleep(10) + + gridMapper.doMap() # All process will be forced to wait till process 2 enters this function in 10 seconds + + if mpiRank != 2: # This will cause the program to hang indefinitely + gridMapper.doMap() + + + + +Sharp Edges +----------- + +A major sharp edge is :code:`num scans >= num cores`. If a run only uses 10 scans, no more than 10 cores may be applied. Therefore this parallelization only improves larger runs. The :code:`Scripts/mpiMapSpecAngleScan.py` script checks for this and raises an error if otherwise. It's highly recommended that your scripts do the same. + +Parallelization also introduces sharp edges related to the gridder and loading of data. + +Gridder +``````` + +Two gridder settings are required for this form of parallelization to work: + +1. Normalization must be off. This is the default in xrayutilities==1.7.3. +2. The gridder must be a static size. This is hard-coded false in the mpigridmapper.py file. + +The gridder is passed through MPI channels :code:`nlog(n)` times (where n = num_procs) during the program. An extremely large gridder size may increase runtimes. + +Loading +``````` + +Loading of data into the datasource is parallelized. As such, a datasource merging operation is required. This is implemented via the following block and functions in :code:`MpiSector33SpecDataSource.py`: + + +.. code-block:: python + + scanData = self.exportScans() + scanData = self.mpiMergeSources(scanData) + + scanData = self.mpiComm.bcast(scanData, root=0) + self.importScans(scanData) + +If your implementation requires additional data to be loaded, :code:`exportScans` and :code:`importScans` will need to be modified. If the data is not list or dict based, :code:`mpiMergeSources` will need modification. Custom merge operations can be added to the conditional in the merge method. Any new data being passed through MPI must be able to be pickled. + +Performance Characteristics +--------------------------- + +This section will cover expected program performance and recommendations for optimizing runs. + +Performance Factors +``````````````````` + +The program adds a :code:`nlog(n)` operation where :code:`n = num_procs` to both loading and gridding. The length of a single operation depends primarily on the size of data being passed through a MPI pipe. This means single-node runs will perform this operation faster than multi-node runs. + +Otherwise, gridding is expected to scale with a factor of :code:`1/(2^n)`. Loading is expected to scale the same, but may be limited by disk access speeds. + +Recommended Settings +```````````````````` + +While multi-threading is applied by xrayutilities, it only affects about <5% of execution time. As such, the main parameter to vary is the number of processes. Through testing we recommend :code:`num_procs = num MPI slots` as a reasonable starting point. Experiment with your system to find what works best. \ No newline at end of file