Coverage for crip\physics.py: 67%

206 statements  

« prev     ^ index     » next       coverage.py v7.5.2, created at 2024-07-16 01:15 +0800

1''' 

2 Physics module of crip. 

3 

4 https://github.com/SEU-CT-Recon/crip 

5''' 

6 

7import json 

8import re 

9import numpy as np 

10from os import path 

11import periodictable as pt 

12import enum 

13 

14from ._typing import * 

15from .utils import * 

16from .io import listDirectory 

17from .postprocess import muToHU 

18 

19## Constants ## 

20 

21DiagEnergyLow = 0 # [keV] Lowest energy of diagnostic X-ray 

22DiagEnergyHigh = 150 # [keV] Highest energy of diagnostic X-ray 

23DiagEnergyRange = range(DiagEnergyLow, DiagEnergyHigh + 1) # Diagonstic energy range, [0, 150] keV 

24DiagEnergyLen = DiagEnergyHigh - DiagEnergyLow + 1 

25NA = 6.02214076e23 # Avogadro's number 

26 

27# Alias for built-in materials 

28AttenAliases = { 

29 'Gold': 'Au', 

30 'Carbon': 'C', 

31 'Copper': 'Cu', 

32 'Iodine': 'I', 

33 'H2O': 'Water', 

34 'Gadolinium': 'Gd', 

35} 

36 

37 

38class EnergyConversion(enum.Enum): 

39 ''' Energy conversion efficiency marker of the detector. 

40 ''' 

41 EID = 'EID' # Energy-Integrating Detector 

42 PCD = 'PCD' # Photon-Counting Detector 

43 

44 

45def getCommonDensity(materialName: str): 

46 ''' Get the common density of a material [g/cm^3] from built-in dataset. 

47 ''' 

48 db = json.loads(readFileText(path.join(getAsset('atten'), 'density.json'))) 

49 

50 if materialName in AttenAliases: 

51 materialName = AttenAliases[materialName] 

52 cripAssert(materialName in db, f'Material not found in density dataset: {materialName}') 

53 

54 return db[materialName] 

55 

56 

57class Spectrum: 

58 

59 def __init__(self, omega: NDArray, unit='keV'): 

60 ''' Construct a Spectrum object with omega array describing every energy bin in DiagEnergyRange. 

61 To access omega of certain energy E [keV], use `spec.omega[E]`. 

62 ''' 

63 cripAssert( 

64 len(omega) == DiagEnergyLen, 

65 f'omega array should length of DiagEnergyLen ({DiagEnergyLen}), but got {len(omega)}.') 

66 cripAssert(unit in EnergyUnits, f'Invalid unit: {unit}.') 

67 

68 self.unit = unit 

69 self.omega = np.array(omega, dtype=DefaultFloatDType) 

70 self.sumOmega = np.sum(self.omega) 

71 

72 def isMonochromatic(self): 

73 ''' Test if the spectrum is monochromatic. If so, return True and the energy. 

74 ''' 

75 at = -1 

76 for i in DiagEnergyRange: 

77 if self.omega[i] > 0: 

78 if at != -1: 

79 return False, None 

80 at = i 

81 

82 return True, at 

83 

84 @staticmethod 

85 def fromText(specText: str, unit='keV'): 

86 ''' Parse spectrum text into `Spectrum` object. 

87 The text should be in the format of `<energy> <omega>`, one pair per line. 

88 Leading lines starting with non-digit will be ignored. 

89 All `<energy>` should be in ascending order and continous by 1 keV and inside `DiagEnergyRange`. 

90 Recommend you to end the last line with `omega=-1` to indicate the end of spectrum. 

91 ''' 

92 cripAssert(unit in EnergyUnits, f'Invalid unit: {unit}.') 

93 omega = np.zeros(DiagEnergyLen, dtype=DefaultFloatDType) 

94 

95 # split content into list, and ignore all lines starting with non-digit 

96 content = list( 

97 filter(lambda y: len(y) > 0 and str.isdigit(y[0]), 

98 list(map(lambda x: x.strip(), 

99 specText.replace('\r\n', '\n').split('\n'))))) 

100 

101 # process each line 

102 def proc1(line: str): 

103 tup = tuple(map(float, re.split(r'\s+', line))) 

104 cripAssert(len(tup) == 2, f'Invalid line in spectrum: {line}.') 

105 

106 return tup # (energy, omega) 

107 

108 # parse the spectrum text to get provided omega array 

109 content = np.array(list(map(proc1, content)), dtype=DefaultFloatDType) 

110 specEnergy, specOmega = content.T 

111 specEnergy = convertEnergyUnit(specEnergy, unit, DefaultEnergyUnit) 

112 cripAssert(np.all(np.diff(specEnergy) == 1), 'Energy should be in ascending order and continous by 1 keV.') 

113 specOmega[specOmega < 0] = 0 

114 startEnergy, cutOffEnergy = int(specEnergy[0]), int(specEnergy[-1]) 

115 cripAssert(startEnergy in DiagEnergyRange, f'startEnergy is out of DiagEnergyRange: {startEnergy} keV') 

116 cripAssert(cutOffEnergy in DiagEnergyRange, f'cutOffEnergy is out of DiagEnergyRange: {cutOffEnergy} keV') 

117 

118 # fill omega array 

119 omega[startEnergy:cutOffEnergy + 1] = specOmega[:] 

120 

121 return Spectrum(omega, unit) 

122 

123 @staticmethod 

124 def fromFile(path: str, unit='keV'): 

125 ''' Construct a Spectrum object from spectrum file whose format follows `fromText`. 

126 ''' 

127 return Spectrum.fromText(readFileText(path), unit) 

128 

129 @staticmethod 

130 def monochromatic(at: int, unit='keV', omega=10**5): 

131 ''' Construct a monochromatic spectrum at `at`. 

132 ''' 

133 text = '{} {}\n{} -1'.format(str(at), str(omega), str(at + 1)) 

134 

135 return Spectrum.fromText(text, unit) 

136 

137 

138class Atten: 

139 

140 def __init__(self, muArray: NDArray, density: Or[None, float] = None) -> None: 

141 ''' Construct an Atten object with mu array describing every LAC for energy bins in DiagEnergyRange. 

142 The density is in [g/cm^3]. To access mu [mm-1] of certain energy E [keV], use `atten.mu[E]`. 

143 ''' 

144 cripAssert(len(muArray) == DiagEnergyLen, f'muArray should have length of {DiagEnergyLen} energy bins.') 

145 if density is not None: 

146 cripAssert(density > 0, 'Density should be positive.') 

147 

148 self.mu = muArray 

149 self.rho = density 

150 self.attenText = '' 

151 self.energyUnit = 'keV' 

152 

153 @staticmethod 

154 def builtInAttenList() -> List[str]: 

155 ''' Get all built-in material names and aliases. 

156 ''' 

157 attenListPath = path.join(getAsset('atten'), 'data') 

158 attenList = list(map(lambda x: x.replace('.txt', ''), listDirectory(attenListPath, style='filename'))) 

159 attenList.extend(AttenAliases.keys()) 

160 

161 return attenList 

162 

163 @staticmethod 

164 def builtInAttenText(material: str): 

165 ''' Get built-in atten file content of `material` from NIST data source. 

166 ''' 

167 if material in AttenAliases: 

168 material = AttenAliases[material] 

169 

170 attenFilePath = path.join(getAsset('atten'), f'data/{material}.txt') 

171 cripAssert(path.exists(attenFilePath), f'Atten file for {material} does not exist.') 

172 

173 return readFileText(attenFilePath) 

174 

175 @staticmethod 

176 def fromBuiltIn(material: str, density: Or[float, None] = None): 

177 ''' Get a built-in atten object for `material`. 

178 Call `builtInAttenList` to inspect all available materials. 

179 The density is in [g/cm^3], and will be automatically filled with common value if not provided. 

180 ''' 

181 if material in AttenAliases: 

182 material = AttenAliases[material] 

183 

184 if density is None: 

185 density = getCommonDensity(material) 

186 

187 return Atten.fromText(Atten.builtInAttenText(material), density, BuiltInAttenEnergyUnit) 

188 

189 @staticmethod 

190 def fromText(attenText: str, density: float, energyUnit='MeV'): 

191 ''' Parse atten text into `Atten` object. Interpolation will be performed to fill `DiagEnergyRange`. 

192 The text should be in the format of `<energy> <mu/density>` one pair per line, 

193 or `<energy> <mu/density> <mu_en/density>` where the second column will be used. 

194 Leading lines starting with non-digit will be ignored. 

195 ''' 

196 rho = density 

197 

198 # split attenText into list, and ignore all lines starting with non-digit 

199 content = list( 

200 filter(lambda y: len(y) > 0 and str.isdigit(y[0]), 

201 list(map(lambda x: x.strip(), 

202 attenText.replace('\r\n', '\n').split('\n'))))) 

203 

204 # process each line 

205 def proc1(line: str): 

206 tup = tuple(map(float, re.split(r'\s+', line))) 

207 cripAssert(len(tup) in [2, 3], f'Invalid line in attenText: {line}.') 

208 energy, muDivRho = tuple(map(float, re.split(r'\s+', line)))[0:2] 

209 

210 return energy, muDivRho 

211 

212 # parse the atten text to get provided mu array 

213 content = np.array(list(map(proc1, content)), dtype=DefaultFloatDType) 

214 energy, muDivRho = content.T 

215 energy = convertEnergyUnit(energy, energyUnit, DefaultEnergyUnit) # keV 

216 

217 # perform log-domain interpolation to fill in `DiagEnergyRange` 

218 interpEnergy = np.log(DiagEnergyRange[1:]) # avoid log(0) 

219 interpMuDivRho = np.interp(interpEnergy, np.log(energy), np.log(muDivRho)) 

220 

221 # now we have mu for every energy in `DiagEnergyRange`. 

222 mu = np.exp(interpMuDivRho) * rho # cm-1 

223 mu = convertMuUnit(mu, 'cm-1', DefaultMuUnit) # mm-1 

224 mu = np.insert(mu, 0, 0, axis=0) # prepend for E=0 

225 

226 return Atten.fromMuArray(mu, rho) 

227 

228 @staticmethod 

229 def fromMuArray(muArray: NDArray, density: Or[float, None] = None): 

230 ''' Construct a new material from mu array for every energy in `DiagEnergyRange`. 

231 The density is in [g/cm^3]. 

232 ''' 

233 return Atten(muArray, density) 

234 

235 @staticmethod 

236 def fromFile(path: str, density: float, energyUnit='MeV'): 

237 ''' Construct a new material from file where the format follows `fromText`. 

238 ''' 

239 return Atten.fromText(readFileText(path), density, energyUnit) 

240 

241 

242WaterAtten = Atten.fromBuiltIn('Water') 

243 

244 

245def computeMu(atten: Atten, spec: Spectrum, energyConversion: EnergyConversion) -> float: 

246 ''' Calculate the LAC (mu) [mm-1] for certain Atten under a Spectrum. 

247 `energyConversion` determines the energy conversion efficiency of the detector. 

248 ''' 

249 mus = atten.mu 

250 eff = { 

251 EnergyConversion.PCD: 1, 

252 EnergyConversion.EID: np.array(DiagEnergyRange), 

253 }[energyConversion] 

254 

255 return np.sum(spec.omega * eff * mus) / np.sum(spec.omega * eff) 

256 

257 

258def computeAttenedSpectrum(spec: Spectrum, attens: List[Atten], Ls: List[float]) -> Spectrum: 

259 ''' Calculate the spectrum after attenuation through `attens` with thickness `Ls` [mm] 

260 using Beer-Lambert law. 

261 ''' 

262 cripAssert(len(attens) == len(Ls), '`attens` should be paired with `Ls`.') 

263 

264 N = len(attens) 

265 omega = np.array(spec.omega, copy=True) 

266 for i in range(N): 

267 atten, L = attens[i], Ls[i] 

268 for E in DiagEnergyRange: # energies 

269 omega[E] *= np.exp(-atten.mu[E] * L) 

270 

271 return Spectrum(omega, spec.unit) 

272 

273 

274def normalizeSpectrum(spec: Spectrum): 

275 ''' Normalize a Spectrum. 

276 ''' 

277 return Spectrum(spec.omega / spec.sumOmega, spec.unit) 

278 

279 

280def forwardProjectWithSpectrum(lengths: List[TwoD], materials: List[Atten], spec: Spectrum, energyConversion: str): 

281 ''' Perform forward projection using `spec`. `lengths` is a list of corresponding length [mm] images  

282 (projection or sinogram) of `materials`.  

283 This function would simulate attenuation and Beam Hardening but no scatter. 

284  

285 Set `lengths` and `materials` to empty lists to compute the flat field. 

286 

287 It's highly recommended to use a monochronmatic spectrum to accelerate if you simulate a lot. 

288 ''' 

289 cripAssert(len(lengths) == len(materials), 'Lengths and materials should correspond.') 

290 cripAssert(all([isOfSameShape(lengths[0], x) for x in lengths]), 'Lengths map should have same shape.') 

291 cripAssert(energyConversion in ['PCD', 'EID'], 'Invalid energyConversion.') 

292 

293 efficiency = 1 if energyConversion == 'PCD' else np.array(DiagEnergyRange) 

294 

295 if len(lengths) == 0: 

296 # compute the flat field 

297 ones = np.ones_like(lengths[0], dtype=DefaultFloatDType) if len(lengths) > 0 else 1 

298 effectiveOmega = spec.omega * efficiency 

299 

300 return np.sum(effectiveOmega) * ones 

301 

302 resultShape = lengths[0].shape 

303 

304 # speed up when it's monochromatic 

305 isMono, monoAt = spec.isMonochromatic() 

306 if isMono: 

307 attenuations = 0.0 

308 for length, material in zip(lengths, materials): 

309 attenuations += length * material.mu[monoAt] 

310 

311 attened = spec.omega[monoAt] * np.exp(-attenuations) * (1 if energyConversion == 'PCD' else monoAt 

312 ) # the attenuated image 

313 

314 return attened 

315 else: 

316 # a[h, w] = [vector of attenuation in that energy bin] 

317 attenuations = np.zeros((*resultShape, DiagEnergyLen), dtype=DefaultFloatDType) 

318 for length, material in zip(lengths, materials): 

319 attenuations += np.outer(length, material.mu).reshape((*resultShape, DiagEnergyLen)) 

320 

321 attened = spec.omega * np.exp(-attenuations) * efficiency # the attenuated image 

322 

323 return np.sum(attened, axis=-1) 

324 

325 

326def brewPowderSolution(solute: Atten, 

327 solvent: Atten, 

328 concentration: float, 

329 concentrationUnit='mg/mL', 

330 rhoSolution: Or[float, None] = None) -> Atten: 

331 ''' Generate the `Atten` of ideal powder solution with certain concentration. 

332 ''' 

333 cripAssert(concentrationUnit in ConcentrationUnits, f'Invalid concentration unit: {concentrationUnit}.') 

334 

335 concentration = convertConcentrationUnit(concentration, concentrationUnit, 'g/mL') 

336 mu = solvent.mu + (solute.mu / solute.rho) * concentration 

337 atten = Atten.fromMuArray(mu, rhoSolution) 

338 

339 return atten 

340 

341 

342def computeContrastHU(contrast: Atten, 

343 spec: Spectrum, 

344 energyConversion: EnergyConversion, 

345 base: Atten = WaterAtten) -> float: 

346 ''' Compute the HU difference caused by contrast. 

347 ''' 

348 _calcMu = lambda atten: computeMu(atten, spec, energyConversion) 

349 

350 muWater = _calcMu(WaterAtten) 

351 muContrast = _calcMu(contrast) 

352 if base is not WaterAtten: 

353 muBase = _calcMu(base) 

354 else: 

355 muBase = muWater 

356 

357 return muToHU(muContrast, muWater) - muToHU(muBase, muWater) 

358 

359 

360def computePathLength(thickness: float, sod: float, detH: int, detW: int, detElementSize: float) -> NDArray: 

361 ''' Compute the ray peneration pathlength for each detector element using a cuboid object with `thickness`. 

362 `sod` is the Source-Object Distance. `(detH, detW)` is the detector size [pixel]. 

363 `detElementSize` is the element size of detector. 

364 All length units are recommended to be [mm]. Currently no object offset can be assumed. 

365 ''' 

366 detCenter = (detW / 2, detH / 2) 

367 r, c = np.meshgrid(np.arange(detH), np.arange(detW)) 

368 coords = np.array((r.flatten(), c.flatten()), dtype=np.float32).T 

369 

370 offcenter = coords - np.array(detCenter) 

371 offcenter = np.abs(offcenter) * detElementSize 

372 offcenterDist = np.sqrt(np.sum(offcenter**2, axis=1)) 

373 

374 theta = np.arctan(offcenterDist / sod) 

375 rayIntersect = thickness / np.cos(theta) 

376 rayIntersect = rayIntersect.reshape((detW, detH)).T 

377 

378 return rayIntersect 

379 

380 

381def atomsFromMolecule(molecule: str) -> Dict[str, int]: 

382 ''' Parse the molecule string to get the atoms and their counts. 

383 e.g. `'H2 O1' -> {'H': 2, 'O': 1}` 

384 ''' 

385 atoms = {} 

386 for part in molecule.split(' '): 

387 count = ''.join(filter(str.isdigit, part)) 

388 element = ''.join(filter(str.isalpha, part)) 

389 atoms[element] = int(count) 

390 

391 return atoms 

392 

393 

394def zeffTheoretical(molecule: str, m=2.94) -> float: 

395 ''' Compute the theoretical effective atomic number (Zeff) of a molecule using the power law with parameter `m`. 

396 `molecule` is parsed by `atomsFromMolecule`. 

397 [1] https://en.wikipedia.org/wiki/Effective_atomic_number_(compounds_and_mixtures) 

398 ''' 

399 atoms = atomsFromMolecule(molecule) 

400 totalElectrons = 0 

401 for atom in atoms: 

402 totalElectrons += atoms[atom] * pt.elements.symbol(atom).number 

403 

404 sumUnderSqrt = 0 

405 for atom in atoms: 

406 Z = pt.elements.symbol(atom).number # atomic number 

407 f = atoms[atom] * Z / totalElectrons # fraction 

408 sumUnderSqrt += f * (Z**m) 

409 

410 return sumUnderSqrt**(1 / m) 

411 

412 

413def zeffExperimental(a1: float, a2: float, rhoe1: float, rhoe2: float, zeff1: float, zeff2: float, m=2.94): 

414 ''' Compute the experimental effective atomic number (Zeff) from Dual-Energy Two-Material decomposition 

415 using the power law with parameter `m`. `(a1, a2)` are material decomposition coefficients. 

416 `(rhoe1, rhoe2)` are electron densities of material bases. 

417 `(zeff1, zeff2)` are effective atomic numbers of material bases. 

418 ''' 

419 n1 = a1 * rhoe1 * zeff1**m 

420 n2 = a2 * rhoe2 * zeff2**m 

421 d1 = a1 * rhoe1 

422 d2 = a2 * rhoe2 

423 

424 return ((n1 + n2) / (d1 + d2))**(1 / m)