|
12 | 12 | logger = logging.getLogger() |
13 | 13 |
|
14 | 14 |
|
15 | | -def frame_reader( |
16 | | - cine_file: Union[str, bytes, PathLike], |
17 | | - header: Header, |
18 | | - start_frame: int = 1, |
19 | | - count: int = None, |
20 | | -) -> Generator[np.ndarray, Any, None]: |
21 | | - frame = start_frame |
22 | | - if not count: |
23 | | - count = header["cinefileheader"].ImageCount |
| 15 | +class Frame_reader: |
| 16 | + def __init__(self, cine_file, header=None, start_index=0, count=None, post_processing=None): |
| 17 | + """ |
| 18 | + Create an object for reading frames from a cine file. It can either be used to get specific frames with __getitem__ or as an iterator. |
| 19 | +
|
| 20 | + Parameters |
| 21 | + ---------- |
| 22 | + cine_file : str or file-like object |
| 23 | + A string containing a path to a cine file |
| 24 | + header : dict (optional) |
| 25 | + A dictionary contains header information of the cine file |
| 26 | + start_index : int |
| 27 | + First image in a pile of images in cine file. Only used with the iterator in the object. |
| 28 | + count : int |
| 29 | + Maximum number of frames to get if this object is used as an iterator. |
| 30 | + post_processing : function |
| 31 | + Function that takes one image parameter and returns a new processed image |
| 32 | + If provided this function will be applied to the raw image before returning. |
| 33 | +
|
| 34 | + Returns |
| 35 | + ------- |
| 36 | + the created object |
| 37 | + """ |
| 38 | + self.cine_file = cine_file |
| 39 | + self.cine_file_stream = open(cine_file, "rb") |
| 40 | + self.post_processing = post_processing |
| 41 | + |
| 42 | + if header is None: |
| 43 | + header = read_header(cine_file) |
| 44 | + self.header = header |
| 45 | + |
| 46 | + self.start_index = start_index |
| 47 | + self.i = self.start_index |
| 48 | + |
| 49 | + self.full_size = self.header["cinefileheader"].ImageCount |
| 50 | + if not count: |
| 51 | + count = self.full_size - self.start_index |
| 52 | + self.end_index = self.start_index + count |
| 53 | + if self.end_index > self.full_size: |
| 54 | + raise ValueError("end_index {} is larger than the maximum {}".format(self.end_index, self.full_size)) |
| 55 | + self.size = count |
| 56 | + |
| 57 | + def __getitem__(self, frame_index): |
| 58 | + """ |
| 59 | + Dunder method to be able to use [] for retrieving images from the object. |
| 60 | +
|
| 61 | + Parameters |
| 62 | + ---------- |
| 63 | + frame_index : int |
| 64 | + the index for the image in cine file to retrieve. |
| 65 | + """ |
| 66 | + f = self.cine_file_stream |
| 67 | + try: |
| 68 | + f.seek(self.header["pImage"][frame_index]) |
| 69 | + except IndexError: |
| 70 | + raise IndexError( |
| 71 | + "Index {} is out of bounds for cine_file with size {}.".format(frame_index, self.full_size) |
| 72 | + ) |
24 | 73 |
|
25 | | - with open(cine_file, "rb") as f: |
26 | | - while count: |
27 | | - frame_index = frame - 1 |
28 | | - logger.debug(f"Reading frame {frame}") |
| 74 | + annotation_size = struct.unpack("I", f.read(4))[0] |
| 75 | + annotation = struct.unpack("{}B".format(annotation_size - 8), f.read((annotation_size - 8) // 8)) |
| 76 | + self.header["Annotation"] = annotation |
29 | 77 |
|
30 | | - f.seek(header["pImage"][frame_index]) |
| 78 | + image_size = struct.unpack("I", f.read(4))[0] |
| 79 | + data = f.read(image_size) |
| 80 | + image = create_raw_array(data, self.header) |
| 81 | + if not self.post_processing is None: |
| 82 | + image = self.post_processing(image) |
| 83 | + return image |
31 | 84 |
|
32 | | - annotation_size = struct.unpack("I", f.read(4))[0] |
33 | | - annotation = struct.unpack(f"{annotation_size - 8}B", f.read((annotation_size - 8) // 8)) |
34 | | - # TODO: Save annotations |
| 85 | + def __iter__(self): |
| 86 | + """ Object of this class is an iterator """ |
| 87 | + return self |
35 | 88 |
|
36 | | - image_size = struct.unpack("I", f.read(4))[0] |
| 89 | + def __next__(self): |
| 90 | + """ When iterating get the next image if more are left """ |
| 91 | + if self.i >= self.end_index: |
| 92 | + raise StopIteration |
| 93 | + logger.debug("Reading frame {}".format(self.i)) |
| 94 | + raw_image = self.__getitem__(self.i) |
| 95 | + self.i += 1 |
| 96 | + return raw_image |
37 | 97 |
|
38 | | - data = f.read(image_size) |
| 98 | + def __len__(self): |
| 99 | + return self.size |
39 | 100 |
|
40 | | - raw_image = create_raw_array(data, header) |
| 101 | + def __del__(self): |
| 102 | + self.cine_file_stream.close() |
41 | 103 |
|
42 | | - yield raw_image |
43 | | - frame += 1 |
44 | | - count -= 1 |
| 104 | + |
| 105 | +# def frame_reader(cine_file, header=None, start_frame=1, count=None): |
| 106 | +def frame_reader( |
| 107 | + cine_file: Union[str, bytes, PathLike], |
| 108 | + header: Header, |
| 109 | + start_frame: int = 1, |
| 110 | + count: int = None, |
| 111 | +) -> Generator[np.ndarray, Any, None]: |
| 112 | + return Frame_reader(cine_file, header, start_frame - 1, count) |
45 | 113 |
|
46 | 114 |
|
47 | 115 | def read_bpp(header): |
|
0 commit comments