Coverage for crip\physics.py: 67%
206 statements
« prev ^ index » next coverage.py v7.5.2, created at 2024-07-16 01:15 +0800
« prev ^ index » next coverage.py v7.5.2, created at 2024-07-16 01:15 +0800
1'''
2 Physics module of crip.
4 https://github.com/SEU-CT-Recon/crip
5'''
7import json
8import re
9import numpy as np
10from os import path
11import periodictable as pt
12import enum
14from ._typing import *
15from .utils import *
16from .io import listDirectory
17from .postprocess import muToHU
19## Constants ##
21DiagEnergyLow = 0 # [keV] Lowest energy of diagnostic X-ray
22DiagEnergyHigh = 150 # [keV] Highest energy of diagnostic X-ray
23DiagEnergyRange = range(DiagEnergyLow, DiagEnergyHigh + 1) # Diagonstic energy range, [0, 150] keV
24DiagEnergyLen = DiagEnergyHigh - DiagEnergyLow + 1
25NA = 6.02214076e23 # Avogadro's number
27# Alias for built-in materials
28AttenAliases = {
29 'Gold': 'Au',
30 'Carbon': 'C',
31 'Copper': 'Cu',
32 'Iodine': 'I',
33 'H2O': 'Water',
34 'Gadolinium': 'Gd',
35}
38class EnergyConversion(enum.Enum):
39 ''' Energy conversion efficiency marker of the detector.
40 '''
41 EID = 'EID' # Energy-Integrating Detector
42 PCD = 'PCD' # Photon-Counting Detector
45def getCommonDensity(materialName: str):
46 ''' Get the common density of a material [g/cm^3] from built-in dataset.
47 '''
48 db = json.loads(readFileText(path.join(getAsset('atten'), 'density.json')))
50 if materialName in AttenAliases:
51 materialName = AttenAliases[materialName]
52 cripAssert(materialName in db, f'Material not found in density dataset: {materialName}')
54 return db[materialName]
57class Spectrum:
59 def __init__(self, omega: NDArray, unit='keV'):
60 ''' Construct a Spectrum object with omega array describing every energy bin in DiagEnergyRange.
61 To access omega of certain energy E [keV], use `spec.omega[E]`.
62 '''
63 cripAssert(
64 len(omega) == DiagEnergyLen,
65 f'omega array should length of DiagEnergyLen ({DiagEnergyLen}), but got {len(omega)}.')
66 cripAssert(unit in EnergyUnits, f'Invalid unit: {unit}.')
68 self.unit = unit
69 self.omega = np.array(omega, dtype=DefaultFloatDType)
70 self.sumOmega = np.sum(self.omega)
72 def isMonochromatic(self):
73 ''' Test if the spectrum is monochromatic. If so, return True and the energy.
74 '''
75 at = -1
76 for i in DiagEnergyRange:
77 if self.omega[i] > 0:
78 if at != -1:
79 return False, None
80 at = i
82 return True, at
84 @staticmethod
85 def fromText(specText: str, unit='keV'):
86 ''' Parse spectrum text into `Spectrum` object.
87 The text should be in the format of `<energy> <omega>`, one pair per line.
88 Leading lines starting with non-digit will be ignored.
89 All `<energy>` should be in ascending order and continous by 1 keV and inside `DiagEnergyRange`.
90 Recommend you to end the last line with `omega=-1` to indicate the end of spectrum.
91 '''
92 cripAssert(unit in EnergyUnits, f'Invalid unit: {unit}.')
93 omega = np.zeros(DiagEnergyLen, dtype=DefaultFloatDType)
95 # split content into list, and ignore all lines starting with non-digit
96 content = list(
97 filter(lambda y: len(y) > 0 and str.isdigit(y[0]),
98 list(map(lambda x: x.strip(),
99 specText.replace('\r\n', '\n').split('\n')))))
101 # process each line
102 def proc1(line: str):
103 tup = tuple(map(float, re.split(r'\s+', line)))
104 cripAssert(len(tup) == 2, f'Invalid line in spectrum: {line}.')
106 return tup # (energy, omega)
108 # parse the spectrum text to get provided omega array
109 content = np.array(list(map(proc1, content)), dtype=DefaultFloatDType)
110 specEnergy, specOmega = content.T
111 specEnergy = convertEnergyUnit(specEnergy, unit, DefaultEnergyUnit)
112 cripAssert(np.all(np.diff(specEnergy) == 1), 'Energy should be in ascending order and continous by 1 keV.')
113 specOmega[specOmega < 0] = 0
114 startEnergy, cutOffEnergy = int(specEnergy[0]), int(specEnergy[-1])
115 cripAssert(startEnergy in DiagEnergyRange, f'startEnergy is out of DiagEnergyRange: {startEnergy} keV')
116 cripAssert(cutOffEnergy in DiagEnergyRange, f'cutOffEnergy is out of DiagEnergyRange: {cutOffEnergy} keV')
118 # fill omega array
119 omega[startEnergy:cutOffEnergy + 1] = specOmega[:]
121 return Spectrum(omega, unit)
123 @staticmethod
124 def fromFile(path: str, unit='keV'):
125 ''' Construct a Spectrum object from spectrum file whose format follows `fromText`.
126 '''
127 return Spectrum.fromText(readFileText(path), unit)
129 @staticmethod
130 def monochromatic(at: int, unit='keV', omega=10**5):
131 ''' Construct a monochromatic spectrum at `at`.
132 '''
133 text = '{} {}\n{} -1'.format(str(at), str(omega), str(at + 1))
135 return Spectrum.fromText(text, unit)
138class Atten:
140 def __init__(self, muArray: NDArray, density: Or[None, float] = None) -> None:
141 ''' Construct an Atten object with mu array describing every LAC for energy bins in DiagEnergyRange.
142 The density is in [g/cm^3]. To access mu [mm-1] of certain energy E [keV], use `atten.mu[E]`.
143 '''
144 cripAssert(len(muArray) == DiagEnergyLen, f'muArray should have length of {DiagEnergyLen} energy bins.')
145 if density is not None:
146 cripAssert(density > 0, 'Density should be positive.')
148 self.mu = muArray
149 self.rho = density
150 self.attenText = ''
151 self.energyUnit = 'keV'
153 @staticmethod
154 def builtInAttenList() -> List[str]:
155 ''' Get all built-in material names and aliases.
156 '''
157 attenListPath = path.join(getAsset('atten'), 'data')
158 attenList = list(map(lambda x: x.replace('.txt', ''), listDirectory(attenListPath, style='filename')))
159 attenList.extend(AttenAliases.keys())
161 return attenList
163 @staticmethod
164 def builtInAttenText(material: str):
165 ''' Get built-in atten file content of `material` from NIST data source.
166 '''
167 if material in AttenAliases:
168 material = AttenAliases[material]
170 attenFilePath = path.join(getAsset('atten'), f'data/{material}.txt')
171 cripAssert(path.exists(attenFilePath), f'Atten file for {material} does not exist.')
173 return readFileText(attenFilePath)
175 @staticmethod
176 def fromBuiltIn(material: str, density: Or[float, None] = None):
177 ''' Get a built-in atten object for `material`.
178 Call `builtInAttenList` to inspect all available materials.
179 The density is in [g/cm^3], and will be automatically filled with common value if not provided.
180 '''
181 if material in AttenAliases:
182 material = AttenAliases[material]
184 if density is None:
185 density = getCommonDensity(material)
187 return Atten.fromText(Atten.builtInAttenText(material), density, BuiltInAttenEnergyUnit)
189 @staticmethod
190 def fromText(attenText: str, density: float, energyUnit='MeV'):
191 ''' Parse atten text into `Atten` object. Interpolation will be performed to fill `DiagEnergyRange`.
192 The text should be in the format of `<energy> <mu/density>` one pair per line,
193 or `<energy> <mu/density> <mu_en/density>` where the second column will be used.
194 Leading lines starting with non-digit will be ignored.
195 '''
196 rho = density
198 # split attenText into list, and ignore all lines starting with non-digit
199 content = list(
200 filter(lambda y: len(y) > 0 and str.isdigit(y[0]),
201 list(map(lambda x: x.strip(),
202 attenText.replace('\r\n', '\n').split('\n')))))
204 # process each line
205 def proc1(line: str):
206 tup = tuple(map(float, re.split(r'\s+', line)))
207 cripAssert(len(tup) in [2, 3], f'Invalid line in attenText: {line}.')
208 energy, muDivRho = tuple(map(float, re.split(r'\s+', line)))[0:2]
210 return energy, muDivRho
212 # parse the atten text to get provided mu array
213 content = np.array(list(map(proc1, content)), dtype=DefaultFloatDType)
214 energy, muDivRho = content.T
215 energy = convertEnergyUnit(energy, energyUnit, DefaultEnergyUnit) # keV
217 # perform log-domain interpolation to fill in `DiagEnergyRange`
218 interpEnergy = np.log(DiagEnergyRange[1:]) # avoid log(0)
219 interpMuDivRho = np.interp(interpEnergy, np.log(energy), np.log(muDivRho))
221 # now we have mu for every energy in `DiagEnergyRange`.
222 mu = np.exp(interpMuDivRho) * rho # cm-1
223 mu = convertMuUnit(mu, 'cm-1', DefaultMuUnit) # mm-1
224 mu = np.insert(mu, 0, 0, axis=0) # prepend for E=0
226 return Atten.fromMuArray(mu, rho)
228 @staticmethod
229 def fromMuArray(muArray: NDArray, density: Or[float, None] = None):
230 ''' Construct a new material from mu array for every energy in `DiagEnergyRange`.
231 The density is in [g/cm^3].
232 '''
233 return Atten(muArray, density)
235 @staticmethod
236 def fromFile(path: str, density: float, energyUnit='MeV'):
237 ''' Construct a new material from file where the format follows `fromText`.
238 '''
239 return Atten.fromText(readFileText(path), density, energyUnit)
242WaterAtten = Atten.fromBuiltIn('Water')
245def computeMu(atten: Atten, spec: Spectrum, energyConversion: EnergyConversion) -> float:
246 ''' Calculate the LAC (mu) [mm-1] for certain Atten under a Spectrum.
247 `energyConversion` determines the energy conversion efficiency of the detector.
248 '''
249 mus = atten.mu
250 eff = {
251 EnergyConversion.PCD: 1,
252 EnergyConversion.EID: np.array(DiagEnergyRange),
253 }[energyConversion]
255 return np.sum(spec.omega * eff * mus) / np.sum(spec.omega * eff)
258def computeAttenedSpectrum(spec: Spectrum, attens: List[Atten], Ls: List[float]) -> Spectrum:
259 ''' Calculate the spectrum after attenuation through `attens` with thickness `Ls` [mm]
260 using Beer-Lambert law.
261 '''
262 cripAssert(len(attens) == len(Ls), '`attens` should be paired with `Ls`.')
264 N = len(attens)
265 omega = np.array(spec.omega, copy=True)
266 for i in range(N):
267 atten, L = attens[i], Ls[i]
268 for E in DiagEnergyRange: # energies
269 omega[E] *= np.exp(-atten.mu[E] * L)
271 return Spectrum(omega, spec.unit)
274def normalizeSpectrum(spec: Spectrum):
275 ''' Normalize a Spectrum.
276 '''
277 return Spectrum(spec.omega / spec.sumOmega, spec.unit)
280def forwardProjectWithSpectrum(lengths: List[TwoD], materials: List[Atten], spec: Spectrum, energyConversion: str):
281 ''' Perform forward projection using `spec`. `lengths` is a list of corresponding length [mm] images
282 (projection or sinogram) of `materials`.
283 This function would simulate attenuation and Beam Hardening but no scatter.
285 Set `lengths` and `materials` to empty lists to compute the flat field.
287 It's highly recommended to use a monochronmatic spectrum to accelerate if you simulate a lot.
288 '''
289 cripAssert(len(lengths) == len(materials), 'Lengths and materials should correspond.')
290 cripAssert(all([isOfSameShape(lengths[0], x) for x in lengths]), 'Lengths map should have same shape.')
291 cripAssert(energyConversion in ['PCD', 'EID'], 'Invalid energyConversion.')
293 efficiency = 1 if energyConversion == 'PCD' else np.array(DiagEnergyRange)
295 if len(lengths) == 0:
296 # compute the flat field
297 ones = np.ones_like(lengths[0], dtype=DefaultFloatDType) if len(lengths) > 0 else 1
298 effectiveOmega = spec.omega * efficiency
300 return np.sum(effectiveOmega) * ones
302 resultShape = lengths[0].shape
304 # speed up when it's monochromatic
305 isMono, monoAt = spec.isMonochromatic()
306 if isMono:
307 attenuations = 0.0
308 for length, material in zip(lengths, materials):
309 attenuations += length * material.mu[monoAt]
311 attened = spec.omega[monoAt] * np.exp(-attenuations) * (1 if energyConversion == 'PCD' else monoAt
312 ) # the attenuated image
314 return attened
315 else:
316 # a[h, w] = [vector of attenuation in that energy bin]
317 attenuations = np.zeros((*resultShape, DiagEnergyLen), dtype=DefaultFloatDType)
318 for length, material in zip(lengths, materials):
319 attenuations += np.outer(length, material.mu).reshape((*resultShape, DiagEnergyLen))
321 attened = spec.omega * np.exp(-attenuations) * efficiency # the attenuated image
323 return np.sum(attened, axis=-1)
326def brewPowderSolution(solute: Atten,
327 solvent: Atten,
328 concentration: float,
329 concentrationUnit='mg/mL',
330 rhoSolution: Or[float, None] = None) -> Atten:
331 ''' Generate the `Atten` of ideal powder solution with certain concentration.
332 '''
333 cripAssert(concentrationUnit in ConcentrationUnits, f'Invalid concentration unit: {concentrationUnit}.')
335 concentration = convertConcentrationUnit(concentration, concentrationUnit, 'g/mL')
336 mu = solvent.mu + (solute.mu / solute.rho) * concentration
337 atten = Atten.fromMuArray(mu, rhoSolution)
339 return atten
342def computeContrastHU(contrast: Atten,
343 spec: Spectrum,
344 energyConversion: EnergyConversion,
345 base: Atten = WaterAtten) -> float:
346 ''' Compute the HU difference caused by contrast.
347 '''
348 _calcMu = lambda atten: computeMu(atten, spec, energyConversion)
350 muWater = _calcMu(WaterAtten)
351 muContrast = _calcMu(contrast)
352 if base is not WaterAtten:
353 muBase = _calcMu(base)
354 else:
355 muBase = muWater
357 return muToHU(muContrast, muWater) - muToHU(muBase, muWater)
360def computePathLength(thickness: float, sod: float, detH: int, detW: int, detElementSize: float) -> NDArray:
361 ''' Compute the ray peneration pathlength for each detector element using a cuboid object with `thickness`.
362 `sod` is the Source-Object Distance. `(detH, detW)` is the detector size [pixel].
363 `detElementSize` is the element size of detector.
364 All length units are recommended to be [mm]. Currently no object offset can be assumed.
365 '''
366 detCenter = (detW / 2, detH / 2)
367 r, c = np.meshgrid(np.arange(detH), np.arange(detW))
368 coords = np.array((r.flatten(), c.flatten()), dtype=np.float32).T
370 offcenter = coords - np.array(detCenter)
371 offcenter = np.abs(offcenter) * detElementSize
372 offcenterDist = np.sqrt(np.sum(offcenter**2, axis=1))
374 theta = np.arctan(offcenterDist / sod)
375 rayIntersect = thickness / np.cos(theta)
376 rayIntersect = rayIntersect.reshape((detW, detH)).T
378 return rayIntersect
381def atomsFromMolecule(molecule: str) -> Dict[str, int]:
382 ''' Parse the molecule string to get the atoms and their counts.
383 e.g. `'H2 O1' -> {'H': 2, 'O': 1}`
384 '''
385 atoms = {}
386 for part in molecule.split(' '):
387 count = ''.join(filter(str.isdigit, part))
388 element = ''.join(filter(str.isalpha, part))
389 atoms[element] = int(count)
391 return atoms
394def zeffTheoretical(molecule: str, m=2.94) -> float:
395 ''' Compute the theoretical effective atomic number (Zeff) of a molecule using the power law with parameter `m`.
396 `molecule` is parsed by `atomsFromMolecule`.
397 [1] https://en.wikipedia.org/wiki/Effective_atomic_number_(compounds_and_mixtures)
398 '''
399 atoms = atomsFromMolecule(molecule)
400 totalElectrons = 0
401 for atom in atoms:
402 totalElectrons += atoms[atom] * pt.elements.symbol(atom).number
404 sumUnderSqrt = 0
405 for atom in atoms:
406 Z = pt.elements.symbol(atom).number # atomic number
407 f = atoms[atom] * Z / totalElectrons # fraction
408 sumUnderSqrt += f * (Z**m)
410 return sumUnderSqrt**(1 / m)
413def zeffExperimental(a1: float, a2: float, rhoe1: float, rhoe2: float, zeff1: float, zeff2: float, m=2.94):
414 ''' Compute the experimental effective atomic number (Zeff) from Dual-Energy Two-Material decomposition
415 using the power law with parameter `m`. `(a1, a2)` are material decomposition coefficients.
416 `(rhoe1, rhoe2)` are electron densities of material bases.
417 `(zeff1, zeff2)` are effective atomic numbers of material bases.
418 '''
419 n1 = a1 * rhoe1 * zeff1**m
420 n2 = a2 * rhoe2 * zeff2**m
421 d1 = a1 * rhoe1
422 d2 = a2 * rhoe2
424 return ((n1 + n2) / (d1 + d2))**(1 / m)