Coverage for crip\io.py: 25%

106 statements  

« prev     ^ index     » next       coverage.py v7.5.2, created at 2024-07-16 01:15 +0800

1''' 

2 I/O module of crip. 

3 

4 https://github.com/SEU-CT-Recon/crip 

5''' 

6 

7import os 

8import re 

9import numpy as np 

10import tifffile 

11import pydicom 

12import natsort 

13 

14from .utils import * 

15from ._typing import * 

16 

17 

18def listDirectory(folder: str, 

19 style='filename', 

20 match: Or[re.Pattern, str, None] = None, 

21 sort='nat', 

22 reverse=False) -> Or[List[str], Iterable[Tuple[str, str]]]: 

23 ''' List files under `folder` and `sort` in `nat`(ural) or `dict`(ionary) order. 

24 Return `style` can be `filename`, `fullpath` or `both` (path, name) tuple. 

25 Results can be filtered by `match` and reversed by `reverse`. 

26 ''' 

27 cripAssert(sort in ['nat', 'dict'], f'Invalid `sort`: {sort}.') 

28 cripAssert(style in ['filename', 'fullpath', 'both'], f'Invalid `style`: {style}.') 

29 

30 files = os.listdir(folder) 

31 

32 if match is not None: 

33 if isinstance(match, str): 

34 match = re.compile(match) 

35 files = list(filter(lambda x: match.search(x), files)) 

36 

37 files = sorted(files, reverse=reverse) if sort == 'dict' else natsort.natsorted( 

38 files, reverse=reverse, alg=natsort.ns.DEFAULT) 

39 

40 if style == 'filename': 

41 return files 

42 elif style == 'fullpath': 

43 return [os.path.join(folder, file) for file in files] 

44 elif style == 'both': 

45 return zip([os.path.join(folder, file) for file in files], files) 

46 

47 

48def imreadDicom(path: str, dtype=None, attrs: Or[None, Dict[str, Any]] = None) -> NDArray: 

49 ''' Read DICOM file, return numpy array. Use `attrs` to supplement DICOM tags for non-standard images. 

50 You should be very careful whether the Rescale is cancelled for CT images. 

51 Convert dtype when `dtype` is not `None`. 

52 ''' 

53 dcm = pydicom.dcmread(path) 

54 

55 if attrs is not None: 

56 for key in attrs: 

57 dcm.__setattr__(key, attrs[key]) 

58 

59 if dtype is not None: 

60 return np.array(dcm.pixel_array).astype(dtype) 

61 else: 

62 return np.array(dcm.pixel_array) 

63 

64 

65def imreadDicoms(dir_: str, **kwargs) -> NDArray: 

66 ''' Read series of DICOM files in a directory. `kwargs` is forwarded to `imreadDicom`. 

67 ''' 

68 imgs = [imreadDicom(x, **kwargs) for x in listDirectory(dir_, style='fullpath')] 

69 

70 return np.array(imgs) 

71 

72 

73def readDicom(path: str) -> pydicom.Dataset: 

74 ''' Read DICOM file as pydicom Dataset object. 

75 ''' 

76 return pydicom.read_file(path) 

77 

78 

79def imreadRaw(path: str, 

80 h: int, 

81 w: int, 

82 dtype=DefaultFloatDType, 

83 nSlice: int = 1, 

84 offset: int = 0, 

85 gap: int = 0, 

86 swapEndian=False) -> NDArray: 

87 ''' Read binary raw file stored in `dtype`. 

88 Return numpy array with shape `(min(nSlice, actualNSlice), h, w)`. 

89 Input `nSlice` can be unequal to the actual number of slices. 

90 Allow setting `offset` from head and `gap` between images in bytes. 

91 Allow change the endian-ness to the contrary of your machine by `swapEndian`. 

92 This function acts like ImageJ's `Import Raw`. 

93 ''' 

94 simpleValidate([ 

95 offset >= 0, 

96 h > 0 and w > 0 and nSlice > 0, 

97 ]) 

98 

99 slices = [] 

100 sliceBytes = h * w * np.dtype(dtype).itemsize # bytes per slice 

101 fileBytes = os.path.getsize(path) # actual bytes of the file 

102 

103 fp = open(path, 'rb') 

104 fp.seek(offset) 

105 for _ in range(nSlice): 

106 slices.append(np.frombuffer(fp.read(sliceBytes), dtype=dtype).reshape((h, w)).squeeze()) 

107 fp.seek(gap, os.SEEK_CUR) 

108 if fp.tell() >= fileBytes: 

109 break 

110 fp.close() 

111 

112 slices = np.array(slices).astype(dtype) 

113 

114 return slices if not swapEndian else slices.byteswap(inplace=True) 

115 

116 

117def imreadRaws(dir_: str, **kwargs: Any) -> NDArray: 

118 ''' Read series of raw images in directory. `kwargs` will be forwarded to `imreadRaw`. 

119 ''' 

120 imgs = [imreadRaw(x, **kwargs) for x in listDirectory(dir_, style='fullpath')] 

121 

122 return np.array(imgs) 

123 

124 

125@ConvertListNDArray 

126def imwriteRaw(img: TwoOrThreeD, path: str, dtype=None, order='CHW', swapEndian=False, fortranOrder=False): 

127 ''' Write `img` to raw file `path`. Convert dtype when `dtype` is not `None`. 

128 `order` can be [CHW or HWC]. 

129 Allow change the endian-ness to the contrary of your machine by `swapEndian`. 

130 Allow using Fortran order (Column-major) by setting `fortranOrder`. 

131 ''' 

132 cripAssert(order in ['CHW', 'HWC'], f'Invalid order: {order}.') 

133 

134 if dtype is not None: 

135 img = img.astype(dtype) 

136 if order == 'HWC': 

137 img = chw2hwc(img) 

138 if swapEndian: 

139 img = img.byteswap(inplace=False) 

140 

141 with open(path, 'wb') as fp: 

142 fp.write(img.tobytes('F' if fortranOrder else 'C')) 

143 

144 

145def imreadTiff(path: str, dtype=None) -> NDArray: 

146 ''' Read TIFF file, return numpy array. Convert dtype when `dtype` is not `None`. 

147 ''' 

148 if dtype is not None: 

149 return np.array(tifffile.imread(path)).astype(dtype) 

150 else: 

151 return np.array(tifffile.imread(path)) 

152 

153 

154def imreadTiffs(dir_: str, **kwargs) -> NDArray: 

155 ''' Read series of tiff images in directory. `kwargs` will be forwarded to `imreadTiff`. 

156 ''' 

157 imgs = [imreadTiff(x, **kwargs) for x in listDirectory(dir_, style='fullpath')] 

158 

159 return np.array(imgs) 

160 

161 

162@ConvertListNDArray 

163def imwriteTiff(img: TwoOrThreeD, path: str, dtype=None): 

164 ''' Write TIFF file. Convert dtype if `dtype` is not `None`. 

165 All floating dtype will be converted to float32. 

166 ''' 

167 if dtype is not None: 

168 img = img.astype(dtype) 

169 if isFloatDtype(img.dtype): 

170 img = img.astype(np.float32) 

171 

172 tifffile.imwrite(path, img) 

173 

174 

175def readEVI(path: str) -> Tuple[NDArray, Dict[str, Any]]: 

176 ''' Read EVI file saved by XCounter Hydra PCD detector. Return the images and metadata. 

177 ''' 

178 metadata = { 

179 'ImageType': None, 

180 'Width': None, 

181 'Height': None, 

182 'NumberOfImages': None, 

183 'OffsetToFirstImage': None, 

184 'GapBetweenImages': None, 

185 'FrameRate': None, 

186 'LowThreshold': None, 

187 'HighThreshold': None, 

188 } 

189 

190 # mapping between EVI file prelude and metadata to fill in 

191 mapping = { 

192 'Image_Type': 'ImageType', 

193 'Width': 'Width', 

194 'Height': 'Height', 

195 'Nr_of_images': 'NumberOfImages', 

196 'Offset_To_First_Image': 'OffsetToFirstImage', 

197 'Gap_between_iamges_in_bytes': 'GapBetweenImages', # for compatibility 

198 'Gap_between_images_in_bytes': 'GapBetweenImages', 

199 'Frame_Rate_HZ': 'FrameRate', 

200 'Low_Thresholds_KV': 'LowThreshold', 

201 'High_Thresholds_KV': 'HighThreshold', 

202 } 

203 

204 take = lambda str, idx: str.split(' ')[idx] 

205 with open(path, 'r', encoding='utf-8', errors='ignore') as fp: 

206 line = fp.readline().strip() 

207 while len(line): 

208 if line == 'COMMENT': 

209 break 

210 

211 key = take(line, 0) 

212 if key in mapping: 

213 val = take(line, 1) 

214 metadata[mapping[key]] = int(val) if str.isdigit(val) else val 

215 

216 line = fp.readline().strip() 

217 

218 nones = list(filter(lambda x: x[1] is None, metadata.items())) 

219 cripAssert(len(nones) == 0, f'Failed to find metadata for {list(map(lambda x: x[0], nones))}') 

220 cripAssert(metadata['ImageType'] in ['Single', 'Double'], f'Unsupported ImageType: {metadata["ImageType"]}') 

221 

222 dtype = {'Single': np.float32, 'Double': np.float64} 

223 img = imreadRaw(path, metadata['Height'], metadata['Width'], dtype[metadata['ImageType']], 

224 metadata['NumberOfImages'], metadata['OffsetToFirstImage'], metadata['GapBetweenImages']) 

225 

226 return img, metadata 

227 

228 

229def imreadEVI(path: str) -> NDArray: 

230 ''' Read EVI file saved by XCounter Hydra PCD detector. Return the images only. 

231 ''' 

232 return readEVI(path)[0] 

233 

234 

235# These CT parameters can be retrieved. 

236CTPARAMS = { 

237 # Manufacturer 

238 'Manufacturer': ([0x0008, 0x0070], str), 

239 'Manufacturer Model Name': ([0x0008, 0x1090], str), 

240 

241 # Study 

242 'Series Instance UID': ([0x0020, 0x000E], str), 

243 

244 # Patient status 

245 'Body Part Examined': ([0x0018, 0x0015], str), 

246 'Patient Position': ([0x0018, 0x5100], str), # (H/F)F(S/P) 

247 

248 # X-Ray exposure 

249 'KVP': ([0x0018, 0x0060], float), 

250 'X Ray Tube Current': ([0x0018, 0x1151], float), # mA 

251 'Exposure Time': ([0x0018, 0x1150], float), 

252 'Exposure': ([0x0018, 0x1152], float), 

253 

254 # CT Reconstruction 

255 'Slice Thickness': ([0x0018, 0x0050], float), 

256 'Data Collection Diameter': ([0x0018, 0x0090], float), 

257 'Reconstruction Diameter': ([0x0018, 0x1100], float), 

258 'Rows': ([0x0028, 0x0010], int), 

259 'Columns': ([0x0028, 0x0011], int), 

260 'Pixel Spacing': ([0x0028, 0x0030], str), # u/v, mm 

261 'Distance Source To Detector': ([0x0018, 0x1110], float), # mm 

262 'Distance Source To Patient': ([0x0018, 0x1111], float), # mm 

263 'Rotation Direction': ([0x0018, 0x1140], str), # CW/CCW 

264 'Bits Allocated': ([0x0028, 0x0100], int), 

265 

266 # Table 

267 'Table Height': ([0x0018, 0x1130], float), 

268 'Table Speed': ([0x0018, 0x9309], float), 

269 'Table Feed Per Rotation': ([0x0018, 0x9310], float), 

270 

271 # CT value rescaling 

272 'Rescale Intercept': ([0x0028, 0x1052], float), 

273 'Rescale Slope': ([0x0028, 0x1053], float), 

274 'Rescale Type': ([0x0028, 0x1054], str), 

275 

276 # Helical CT 

277 'Spiral Pitch Factor': ([0x0018, 0x9311], float) 

278} 

279 

280 

281def fetchCTParam(dicom: pydicom.Dataset, key: str) -> Any: 

282 ''' Fetch CT metadata from DICOM Dataset read by `readDicom` 

283 Refer to `CTPARAMS` for available keys. 

284 ''' 

285 cripAssert(key in CTPARAMS.keys(), f'Invalid key: {key}.') 

286 

287 addr, cast = CTPARAMS[key] 

288 

289 return cast(dicom[addr[0], addr[1]].value)