Coverage for crip\io.py: 25%
106 statements
« prev ^ index » next coverage.py v7.5.2, created at 2024-07-16 01:15 +0800
« prev ^ index » next coverage.py v7.5.2, created at 2024-07-16 01:15 +0800
1'''
2 I/O module of crip.
4 https://github.com/SEU-CT-Recon/crip
5'''
7import os
8import re
9import numpy as np
10import tifffile
11import pydicom
12import natsort
14from .utils import *
15from ._typing import *
18def listDirectory(folder: str,
19 style='filename',
20 match: Or[re.Pattern, str, None] = None,
21 sort='nat',
22 reverse=False) -> Or[List[str], Iterable[Tuple[str, str]]]:
23 ''' List files under `folder` and `sort` in `nat`(ural) or `dict`(ionary) order.
24 Return `style` can be `filename`, `fullpath` or `both` (path, name) tuple.
25 Results can be filtered by `match` and reversed by `reverse`.
26 '''
27 cripAssert(sort in ['nat', 'dict'], f'Invalid `sort`: {sort}.')
28 cripAssert(style in ['filename', 'fullpath', 'both'], f'Invalid `style`: {style}.')
30 files = os.listdir(folder)
32 if match is not None:
33 if isinstance(match, str):
34 match = re.compile(match)
35 files = list(filter(lambda x: match.search(x), files))
37 files = sorted(files, reverse=reverse) if sort == 'dict' else natsort.natsorted(
38 files, reverse=reverse, alg=natsort.ns.DEFAULT)
40 if style == 'filename':
41 return files
42 elif style == 'fullpath':
43 return [os.path.join(folder, file) for file in files]
44 elif style == 'both':
45 return zip([os.path.join(folder, file) for file in files], files)
48def imreadDicom(path: str, dtype=None, attrs: Or[None, Dict[str, Any]] = None) -> NDArray:
49 ''' Read DICOM file, return numpy array. Use `attrs` to supplement DICOM tags for non-standard images.
50 You should be very careful whether the Rescale is cancelled for CT images.
51 Convert dtype when `dtype` is not `None`.
52 '''
53 dcm = pydicom.dcmread(path)
55 if attrs is not None:
56 for key in attrs:
57 dcm.__setattr__(key, attrs[key])
59 if dtype is not None:
60 return np.array(dcm.pixel_array).astype(dtype)
61 else:
62 return np.array(dcm.pixel_array)
65def imreadDicoms(dir_: str, **kwargs) -> NDArray:
66 ''' Read series of DICOM files in a directory. `kwargs` is forwarded to `imreadDicom`.
67 '''
68 imgs = [imreadDicom(x, **kwargs) for x in listDirectory(dir_, style='fullpath')]
70 return np.array(imgs)
73def readDicom(path: str) -> pydicom.Dataset:
74 ''' Read DICOM file as pydicom Dataset object.
75 '''
76 return pydicom.read_file(path)
79def imreadRaw(path: str,
80 h: int,
81 w: int,
82 dtype=DefaultFloatDType,
83 nSlice: int = 1,
84 offset: int = 0,
85 gap: int = 0,
86 swapEndian=False) -> NDArray:
87 ''' Read binary raw file stored in `dtype`.
88 Return numpy array with shape `(min(nSlice, actualNSlice), h, w)`.
89 Input `nSlice` can be unequal to the actual number of slices.
90 Allow setting `offset` from head and `gap` between images in bytes.
91 Allow change the endian-ness to the contrary of your machine by `swapEndian`.
92 This function acts like ImageJ's `Import Raw`.
93 '''
94 simpleValidate([
95 offset >= 0,
96 h > 0 and w > 0 and nSlice > 0,
97 ])
99 slices = []
100 sliceBytes = h * w * np.dtype(dtype).itemsize # bytes per slice
101 fileBytes = os.path.getsize(path) # actual bytes of the file
103 fp = open(path, 'rb')
104 fp.seek(offset)
105 for _ in range(nSlice):
106 slices.append(np.frombuffer(fp.read(sliceBytes), dtype=dtype).reshape((h, w)).squeeze())
107 fp.seek(gap, os.SEEK_CUR)
108 if fp.tell() >= fileBytes:
109 break
110 fp.close()
112 slices = np.array(slices).astype(dtype)
114 return slices if not swapEndian else slices.byteswap(inplace=True)
117def imreadRaws(dir_: str, **kwargs: Any) -> NDArray:
118 ''' Read series of raw images in directory. `kwargs` will be forwarded to `imreadRaw`.
119 '''
120 imgs = [imreadRaw(x, **kwargs) for x in listDirectory(dir_, style='fullpath')]
122 return np.array(imgs)
125@ConvertListNDArray
126def imwriteRaw(img: TwoOrThreeD, path: str, dtype=None, order='CHW', swapEndian=False, fortranOrder=False):
127 ''' Write `img` to raw file `path`. Convert dtype when `dtype` is not `None`.
128 `order` can be [CHW or HWC].
129 Allow change the endian-ness to the contrary of your machine by `swapEndian`.
130 Allow using Fortran order (Column-major) by setting `fortranOrder`.
131 '''
132 cripAssert(order in ['CHW', 'HWC'], f'Invalid order: {order}.')
134 if dtype is not None:
135 img = img.astype(dtype)
136 if order == 'HWC':
137 img = chw2hwc(img)
138 if swapEndian:
139 img = img.byteswap(inplace=False)
141 with open(path, 'wb') as fp:
142 fp.write(img.tobytes('F' if fortranOrder else 'C'))
145def imreadTiff(path: str, dtype=None) -> NDArray:
146 ''' Read TIFF file, return numpy array. Convert dtype when `dtype` is not `None`.
147 '''
148 if dtype is not None:
149 return np.array(tifffile.imread(path)).astype(dtype)
150 else:
151 return np.array(tifffile.imread(path))
154def imreadTiffs(dir_: str, **kwargs) -> NDArray:
155 ''' Read series of tiff images in directory. `kwargs` will be forwarded to `imreadTiff`.
156 '''
157 imgs = [imreadTiff(x, **kwargs) for x in listDirectory(dir_, style='fullpath')]
159 return np.array(imgs)
162@ConvertListNDArray
163def imwriteTiff(img: TwoOrThreeD, path: str, dtype=None):
164 ''' Write TIFF file. Convert dtype if `dtype` is not `None`.
165 All floating dtype will be converted to float32.
166 '''
167 if dtype is not None:
168 img = img.astype(dtype)
169 if isFloatDtype(img.dtype):
170 img = img.astype(np.float32)
172 tifffile.imwrite(path, img)
175def readEVI(path: str) -> Tuple[NDArray, Dict[str, Any]]:
176 ''' Read EVI file saved by XCounter Hydra PCD detector. Return the images and metadata.
177 '''
178 metadata = {
179 'ImageType': None,
180 'Width': None,
181 'Height': None,
182 'NumberOfImages': None,
183 'OffsetToFirstImage': None,
184 'GapBetweenImages': None,
185 'FrameRate': None,
186 'LowThreshold': None,
187 'HighThreshold': None,
188 }
190 # mapping between EVI file prelude and metadata to fill in
191 mapping = {
192 'Image_Type': 'ImageType',
193 'Width': 'Width',
194 'Height': 'Height',
195 'Nr_of_images': 'NumberOfImages',
196 'Offset_To_First_Image': 'OffsetToFirstImage',
197 'Gap_between_iamges_in_bytes': 'GapBetweenImages', # for compatibility
198 'Gap_between_images_in_bytes': 'GapBetweenImages',
199 'Frame_Rate_HZ': 'FrameRate',
200 'Low_Thresholds_KV': 'LowThreshold',
201 'High_Thresholds_KV': 'HighThreshold',
202 }
204 take = lambda str, idx: str.split(' ')[idx]
205 with open(path, 'r', encoding='utf-8', errors='ignore') as fp:
206 line = fp.readline().strip()
207 while len(line):
208 if line == 'COMMENT':
209 break
211 key = take(line, 0)
212 if key in mapping:
213 val = take(line, 1)
214 metadata[mapping[key]] = int(val) if str.isdigit(val) else val
216 line = fp.readline().strip()
218 nones = list(filter(lambda x: x[1] is None, metadata.items()))
219 cripAssert(len(nones) == 0, f'Failed to find metadata for {list(map(lambda x: x[0], nones))}')
220 cripAssert(metadata['ImageType'] in ['Single', 'Double'], f'Unsupported ImageType: {metadata["ImageType"]}')
222 dtype = {'Single': np.float32, 'Double': np.float64}
223 img = imreadRaw(path, metadata['Height'], metadata['Width'], dtype[metadata['ImageType']],
224 metadata['NumberOfImages'], metadata['OffsetToFirstImage'], metadata['GapBetweenImages'])
226 return img, metadata
229def imreadEVI(path: str) -> NDArray:
230 ''' Read EVI file saved by XCounter Hydra PCD detector. Return the images only.
231 '''
232 return readEVI(path)[0]
235# These CT parameters can be retrieved.
236CTPARAMS = {
237 # Manufacturer
238 'Manufacturer': ([0x0008, 0x0070], str),
239 'Manufacturer Model Name': ([0x0008, 0x1090], str),
241 # Study
242 'Series Instance UID': ([0x0020, 0x000E], str),
244 # Patient status
245 'Body Part Examined': ([0x0018, 0x0015], str),
246 'Patient Position': ([0x0018, 0x5100], str), # (H/F)F(S/P)
248 # X-Ray exposure
249 'KVP': ([0x0018, 0x0060], float),
250 'X Ray Tube Current': ([0x0018, 0x1151], float), # mA
251 'Exposure Time': ([0x0018, 0x1150], float),
252 'Exposure': ([0x0018, 0x1152], float),
254 # CT Reconstruction
255 'Slice Thickness': ([0x0018, 0x0050], float),
256 'Data Collection Diameter': ([0x0018, 0x0090], float),
257 'Reconstruction Diameter': ([0x0018, 0x1100], float),
258 'Rows': ([0x0028, 0x0010], int),
259 'Columns': ([0x0028, 0x0011], int),
260 'Pixel Spacing': ([0x0028, 0x0030], str), # u/v, mm
261 'Distance Source To Detector': ([0x0018, 0x1110], float), # mm
262 'Distance Source To Patient': ([0x0018, 0x1111], float), # mm
263 'Rotation Direction': ([0x0018, 0x1140], str), # CW/CCW
264 'Bits Allocated': ([0x0028, 0x0100], int),
266 # Table
267 'Table Height': ([0x0018, 0x1130], float),
268 'Table Speed': ([0x0018, 0x9309], float),
269 'Table Feed Per Rotation': ([0x0018, 0x9310], float),
271 # CT value rescaling
272 'Rescale Intercept': ([0x0028, 0x1052], float),
273 'Rescale Slope': ([0x0028, 0x1053], float),
274 'Rescale Type': ([0x0028, 0x1054], str),
276 # Helical CT
277 'Spiral Pitch Factor': ([0x0018, 0x9311], float)
278}
281def fetchCTParam(dicom: pydicom.Dataset, key: str) -> Any:
282 ''' Fetch CT metadata from DICOM Dataset read by `readDicom`
283 Refer to `CTPARAMS` for available keys.
284 '''
285 cripAssert(key in CTPARAMS.keys(), f'Invalid key: {key}.')
287 addr, cast = CTPARAMS[key]
289 return cast(dicom[addr[0], addr[1]].value)