Skip to content

API Reference

Welcome to the complete API reference for BayerDithering.

Types & Aliases

These custom type aliases are used throughout the library to ensure static typing safety and make function signatures cleaner.

BayerDithering.core.MediaInput: TypeAlias = Union[NDArray[np.uint8], cv2.VideoCapture, Any] module-attribute

Represents the accepted input formats for the dithering process. Can be a raw NumPy image array, an OpenCV VideoCapture stream, or an imageio GIF reader.

BayerDithering.core.MediaOutput: TypeAlias = Union[NDArray[np.uint8], ProcessedVideo, ProcessedGIF] module-attribute

Represents the possible outputs returned by the dithering orchestrator, depending on the input provided.

BayerDithering.core.RGBColor: TypeAlias = tuple[int, int, int] module-attribute

A standard RGB color representation using a tuple of three integers (0-255).

BayerDithering.core.ColorFilter: TypeAlias = tuple[RGBColor, RGBColor] module-attribute

A duotone palette consisting of two RGBColor tuples: (light_color, dark_color).

Core Processing

Below are the primary classes responsible for orchestrating the dithering process and configuring the media pipeline.

BayerDithering.DitherConfig dataclass

Main configuration for the Bayer Dithering process.

Attributes:

Name Type Description
b_matrix NDArray[float32]

Normalized Bayer matrix (values from 0.0 to 1.0).

contrast float

Contrast multiplier applied before dithering (default: 1.5).

sharpness float

Intensity of the unsharp mask filter (default: 1.6).

downscale_factor int

Factor by which the input is downscaled before processing (default: 1).

upscale bool

If True, resizes the processed image back to its original dimensions (default: False).

filter Optional[ColorFilter]

Tuple with two RGB colors (light, dark) to apply as a duotone palette (default: None).

Source code in BayerDithering/core.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@dataclass
class DitherConfig:
    """Main configuration for the Bayer Dithering process.

    Attributes:
        b_matrix (NDArray[np.float32]): Normalized Bayer matrix (values from 0.0 to 1.0).
        contrast (float): Contrast multiplier applied before dithering (default: 1.5).
        sharpness (float): Intensity of the unsharp mask filter (default: 1.6).
        downscale_factor (int): Factor by which the input is downscaled before processing (default: 1).
        upscale (bool): If True, resizes the processed image back to its original dimensions (default: False).
        filter (Optional[ColorFilter]): Tuple with two RGB colors (light, dark) to apply as a duotone palette (default: None).
    """

    b_matrix: NDArray[np.float32]
    contrast: float = 1.5
    sharpness: float = 1.6
    downscale_factor: int = 1
    upscale: bool = False
    filter: Optional[ColorFilter] = None

BayerDithering.BayerDither

Main orchestrator for the Bayer Dithering library.

Handles media routing (images vs. video) and delegates the actual computation to the injected processor (CPU or GPU).

Source code in BayerDithering/dither.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class BayerDither():
    """Main orchestrator for the Bayer Dithering library.

    Handles media routing (images vs. video) and delegates the actual 
    computation to the injected processor (CPU or GPU).
    """
    def __init__(self, processor: MediaProcessor, verbose: bool = False) -> None:
        """Initializes the Dithering orchestrator.

        Args:
            processor (MediaProcessor): An initialized instance of CPUProcessor or GPUProcessor.
            verbose (bool, optional): Enables detailed DEBUG logging. Defaults to False.
        """

        self.logger = logging.getLogger(__name__)
        level = logging.DEBUG if verbose else logging.WARNING
        self.logger.setLevel(level)

        self.processor = processor
        self.config = self.processor.config

        self.device = "gpu" if isinstance(processor, GPUProcessor) else "cpu"

    def _get_video_metadata(self, video: cv2.VideoCapture) -> tuple[int, int, int, float]:
        width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = video.get(cv2.CAP_PROP_FPS)

        if fps <= 0:
            self.logger.warning("FPS invalid or not detected, defaulting to 30")
            fps = 30

        self.logger.debug(
            f"Video info: {width}x{height} @ {fps} FPS | total_frames={total_frames}"
        )

        return width, height, total_frames, fps

    def _create_writer(self, width, height, fps) -> tuple[cv2.VideoWriter, str]:
        fd, temp_path = tempfile.mkstemp(suffix=".mp4")
        os.close(fd)

        is_color = self.processor.config.filter is not None
        if isinstance(self.processor, GPUProcessor):
            is_color = True

        fourcc = cv2.VideoWriter_fourcc(*"mp4v")

        out = cv2.VideoWriter(
            filename=temp_path,
            fourcc=fourcc,
            fps=fps,
            frameSize=(width, height),
            isColor=is_color
        )

        return out, temp_path

    def _compute_output_size(self, width: int, height: int) -> tuple[int, int]:
        if (self.processor.config.downscale_factor > 1 and not self.processor.config.upscale):

            new_width = max(1, width // self.processor.config.downscale_factor)
            new_height = max(1, height // self.processor.config.downscale_factor)

            self.logger.debug(
                f"Downscaling output to {new_width}x{new_height} "
                f"(factor={self.processor.config.downscale_factor})"
            )

            return new_width, new_height

        return width, height

    def _apply_to_video_cpu(self, video: cv2.VideoCapture) -> ProcessedVideo:
        processor: CPUProcessor = self.processor
        start: float = time.time()

        video.set(cv2.CAP_PROP_POS_FRAMES, 0)

        width, height, total_frames, fps = self._get_video_metadata(video)
        width, height = self._compute_output_size(width, height)

        out, temp_path = self._create_writer(width, height, fps)

        frame_counter: int = 0

        while True:
            ret, frame = video.read()
            if not ret:
                break

            processed: NDArray[np.uint8] = processor.process_frame(frame)
            out.write(processed)
            frame_counter += 1

            if frame_counter % 100 == 0 or frame_counter == total_frames:
                self.logger.debug(f"Writer progress: {frame_counter}/{total_frames}")

        out.release()

        elapsed: float = time.time() - start
        fps_eff: float = total_frames / elapsed if elapsed > 0 else 0

        self.logger.info("Video processing completed")
        self.logger.info(f"Processing completed in {elapsed:.2f}s | Effective FPS: {fps_eff:.2f}")
        self.logger.debug(f"Temporary file created at: {temp_path}")

        return ProcessedVideo(temp_path, self.logger.getEffectiveLevel())

    def _apply_to_video_gpu(self, video: cv2.VideoCapture) -> ProcessedVideo:
        processor: GPUProcessor = self.processor
        start: float = time.time()
        video.set(cv2.CAP_PROP_POS_FRAMES, 0)

        width, height, total_frames, fps = self._get_video_metadata(video)

        downscaled_w = max(1, width // self.config.downscale_factor)
        downscaled_h = max(1, height // self.config.downscale_factor)

        writer_width: int = width if self.config.upscale else downscaled_w
        writer_height: int = height if self.config.upscale else downscaled_h

        out, temp_path = self._create_writer(writer_width, writer_height, fps)

        batch_size: int = 128
        frame_counter: int = 0
        batch: list[NDArray[np.uint8]] = []

        while True:
            ret, frame = video.read()
            if not ret:
                break

            if len(batch) < batch_size:
                batch.append(frame)
            else:
                result_batch = processor.process_video_batch(batch)

                for processed_frame in result_batch:
                    if self.config.upscale:
                        processed_frame = cv2.resize(
                            processed_frame,
                            (width, height),
                            interpolation=cv2.INTER_NEAREST
                        )
                    out.write(processed_frame)
                    frame_counter += 1

                    if frame_counter % 100 == 0 or frame_counter == total_frames:
                        self.logger.debug(f"Writer progress: {frame_counter}/{total_frames}")

                batch.clear()
                batch.append(frame)

        if batch:
            result_batch = processor.process_video_batch(batch)

            for processed_frame in result_batch:
                if self.config.upscale:
                    processed_frame = cv2.resize(
                        processed_frame,
                        (width, height),
                        interpolation=cv2.INTER_NEAREST
                    )
                out.write(processed_frame)
                frame_counter += 1

                if frame_counter % 100 == 0 or frame_counter == total_frames:
                    self.logger.debug(f"Writer progress: {frame_counter}/{total_frames}")

            batch.clear()

        out.release()

        elapsed: float = time.time() - start
        fps_eff: float = total_frames / elapsed if elapsed > 0 else 0

        self.logger.info("Video processing completed")
        self.logger.info(f"Processing completed in {elapsed:.2f}s | Effective FPS: {fps_eff:.2f}")
        self.logger.debug(f"Temporary file created at: {temp_path}")

        return ProcessedVideo(temp_path, self.logger.getEffectiveLevel())

    def apply_to_video(self, video: cv2.VideoCapture) -> ProcessedVideo:
        """Applies the dithering process to an entire video stream.

        Args:
            video (cv2.VideoCapture): An opened OpenCV VideoCapture object.

        Returns:
            ProcessedVideo: A wrapper object containing the path to the processed temporary video file.
        """

        config: DitherConfig = self.config

        self.logger.info(
            f"Starting video processing | "
            f"device={self.device} | "
            f"matrix={config.b_matrix.shape} | "
            f"contrast={config.contrast} | "
            f"sharpness={config.sharpness} | "
            f"downscale={config.downscale_factor} | "
            f"upscale={config.upscale} | "
            f"filter={'yes' if config.filter else 'no'}"
        )

        if self.device == "gpu":
            result = self._apply_to_video_gpu(video)
        else:
            result = self._apply_to_video_cpu(video)

        return result

    def apply_to_image(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Applies the dithering process to a single image array.

        Args:
            image (NDArray[np.uint8]): The source image as a NumPy array (BGR format).

        Returns:
            NDArray[np.uint8]: The processed image as a NumPy array.
        """

        config: DitherConfig = self.config

        self.logger.info(
            f"Starting image processing | "
            f"device={self.device} | "
            f"matrix={config.b_matrix.shape} | "
            f"contrast={config.contrast} | "
            f"sharpness={config.sharpness} | "
            f"downscale={config.downscale_factor} | "
            f"upscale={config.upscale} | "
            f"filter={'yes' if config.filter else 'no'}"
        )

        size_mb: float = image.nbytes / (1024 * 1024)
        h, w = image.shape[:2]
        channels: int = 1 if len(image.shape) == 2 else image.shape[2]

        self.logger.info(f"Image info: "
            f"resolution={w}x{h} | "
            f"channels={channels} | "
            f"size={size_mb:.2f}MB | " 
            f"dtype={image.dtype}"
        )

        start: float = time.time()

        result: NDArray[np.uint8] = self.processor.process_frame(image)

        elapsed : float = time.time() - start
        self.logger.debug(f"Processing completed in {elapsed:.2f}s")

        return result

    def apply_to_gif(self, gif_reader: imageio.v2.LegacyReader) -> ProcessedGIF:
        """Applies the dithering process to an animated GIF stream.

        Args:
            gif_reader: An opened imageio Reader object.

        Returns:
            ProcessedGIF: A wrapper object containing the path to the temporary processed GIF.
        """

        config: DitherConfig = self.config

        self.logger.info(
            f"Starting GIF processing | "
            f"device={self.device} | " 
            f"matrix={config.b_matrix.shape} | "
            f"contrast={config.contrast} | "
            f"sharpness={config.sharpness} | "
            f"downscale={config.downscale_factor} | "
            f"upscale={config.upscale} | "
            f"filter={'yes' if config.filter else 'no'}"
        )

        start = time.time()

        meta = gif_reader.get_meta_data()
        duration = meta.get('duration', 100) # ms per frame
        loop = meta.get('loop', 0)

        processed_frames = []
        batch = []
        batch_size = 128 if self.device == "gpu" else 1

        for frame in gif_reader:
            if frame.shape[-1] == 4:
                frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
            else:
                frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

            if self.device == "cpu":
                proc = self._apply_to_video_cpu_frame(frame)
                processed_frames.append(proc)
            else:
                batch.append(frame)
                if len(batch) == batch_size:
                    res_batch = self.processor.process_video_batch(batch)
                    for idx, r in enumerate(res_batch):
                        proc = self._post_process_frame(r, batch[idx].shape[:2])
                        processed_frames.append(proc)
                    batch.clear()

        if batch and self.device == "gpu":
            res_batch = self.processor.process_video_batch(batch)
            for idx, r in enumerate(res_batch):
                proc = self._post_process_frame(r, batch[idx].shape[:2])
                processed_frames.append(proc)

        fd, temp_path = tempfile.mkstemp(suffix=".gif")
        os.close(fd)

        self.logger.debug("Writing GIF frames to temporary file...")
        with imageio.get_writer(temp_path, format='GIF', duration=duration, loop=loop) as writer:
            for f in processed_frames:
                writer.append_data(f)

        elapsed = time.time() - start
        self.logger.info(f"GIF processing completed in {elapsed:.2f}s")

        return ProcessedGIF(temp_path, self.logger.getEffectiveLevel())

    def _apply_to_video_cpu_frame(self, frame: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Helper to process a frame on CPU and convert back to RGB."""
        original_shape = frame.shape[:2]
        proc = self.processor.process_frame(frame)
        return self._post_process_frame(proc, original_shape)

    def _post_process_frame(self, frame: NDArray[np.uint8], original_shape: tuple[int, int]) -> NDArray[np.uint8]:
        """Helper to handle upscaling and BGR->RGB conversion for GIFs."""
        if self.config.upscale:
            frame = cv2.resize(frame, (original_shape[1], original_shape[0]), interpolation=cv2.INTER_NEAREST)
        return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    def apply(self, media: MediaInput) -> MediaOutput:
        """Unified method to apply the dithering effect to supported media streams.

        Args:
            media (MediaInput): cv2.VideoCapture, NDArray (image), or imageio.Reader (GIF).

        Returns:
            MediaOutput: The processed result (NDArray, ProcessedVideo, or ProcessedGIF).

        Raises:
            TypeError: If the media type is not supported.
        """

        if isinstance(media, cv2.VideoCapture):
            return self.apply_to_video(video=media)
        elif type(media) is np.ndarray:
            return self.apply_to_image(image=media)
        elif hasattr(media, 'get_meta_data') and hasattr(media, 'get_length'):
            return self.apply_to_gif(gif_reader=media)
        else:
            raise TypeError(
                f"Unsupported media type: {type(media)}"
            )

    @staticmethod
    def get_available_devices() -> list[str]:
        """Checks the system for supported processing hardware.

        Returns:
            list[str]: A list of available devices. Will always include 'cpu', 
                       and will include 'gpu' if Taichi detects a valid backend (CUDA, Vulkan, or Metal).
        """

        devices = ["cpu"]

        if not TAICHI_AVAILABLE:
            return devices


        # temporarily mute Taichi to prevent it from printing errors 
        # if the user doesn't have a GPU drivers installed.
        os.environ["TI_LOG_LEVEL"] = "ERROR"
        with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
            import taichi as ti
            try:
                # list of backend enums it can use on this machine
                archs = ti.supported_archs()

                # Check if there is any hardware accelerator beyond the CPU
                if any(arch in archs for arch in [ti.cuda, ti.vulkan, ti.metal, ti.opengl, ti.dx11]):
                    devices.append("gpu")
            except Exception:
                pass

        return devices

__init__(processor: MediaProcessor, verbose: bool = False) -> None

Initializes the Dithering orchestrator.

Parameters:

Name Type Description Default
processor MediaProcessor

An initialized instance of CPUProcessor or GPUProcessor.

required
verbose bool

Enables detailed DEBUG logging. Defaults to False.

False
Source code in BayerDithering/dither.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, processor: MediaProcessor, verbose: bool = False) -> None:
    """Initializes the Dithering orchestrator.

    Args:
        processor (MediaProcessor): An initialized instance of CPUProcessor or GPUProcessor.
        verbose (bool, optional): Enables detailed DEBUG logging. Defaults to False.
    """

    self.logger = logging.getLogger(__name__)
    level = logging.DEBUG if verbose else logging.WARNING
    self.logger.setLevel(level)

    self.processor = processor
    self.config = self.processor.config

    self.device = "gpu" if isinstance(processor, GPUProcessor) else "cpu"

apply(media: MediaInput) -> MediaOutput

Unified method to apply the dithering effect to supported media streams.

Parameters:

Name Type Description Default
media MediaInput

cv2.VideoCapture, NDArray (image), or imageio.Reader (GIF).

required

Returns:

Name Type Description
MediaOutput MediaOutput

The processed result (NDArray, ProcessedVideo, or ProcessedGIF).

Raises:

Type Description
TypeError

If the media type is not supported.

Source code in BayerDithering/dither.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def apply(self, media: MediaInput) -> MediaOutput:
    """Unified method to apply the dithering effect to supported media streams.

    Args:
        media (MediaInput): cv2.VideoCapture, NDArray (image), or imageio.Reader (GIF).

    Returns:
        MediaOutput: The processed result (NDArray, ProcessedVideo, or ProcessedGIF).

    Raises:
        TypeError: If the media type is not supported.
    """

    if isinstance(media, cv2.VideoCapture):
        return self.apply_to_video(video=media)
    elif type(media) is np.ndarray:
        return self.apply_to_image(image=media)
    elif hasattr(media, 'get_meta_data') and hasattr(media, 'get_length'):
        return self.apply_to_gif(gif_reader=media)
    else:
        raise TypeError(
            f"Unsupported media type: {type(media)}"
        )

apply_to_gif(gif_reader: imageio.v2.LegacyReader) -> ProcessedGIF

Applies the dithering process to an animated GIF stream.

Parameters:

Name Type Description Default
gif_reader LegacyReader

An opened imageio Reader object.

required

Returns:

Name Type Description
ProcessedGIF ProcessedGIF

A wrapper object containing the path to the temporary processed GIF.

Source code in BayerDithering/dither.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def apply_to_gif(self, gif_reader: imageio.v2.LegacyReader) -> ProcessedGIF:
    """Applies the dithering process to an animated GIF stream.

    Args:
        gif_reader: An opened imageio Reader object.

    Returns:
        ProcessedGIF: A wrapper object containing the path to the temporary processed GIF.
    """

    config: DitherConfig = self.config

    self.logger.info(
        f"Starting GIF processing | "
        f"device={self.device} | " 
        f"matrix={config.b_matrix.shape} | "
        f"contrast={config.contrast} | "
        f"sharpness={config.sharpness} | "
        f"downscale={config.downscale_factor} | "
        f"upscale={config.upscale} | "
        f"filter={'yes' if config.filter else 'no'}"
    )

    start = time.time()

    meta = gif_reader.get_meta_data()
    duration = meta.get('duration', 100) # ms per frame
    loop = meta.get('loop', 0)

    processed_frames = []
    batch = []
    batch_size = 128 if self.device == "gpu" else 1

    for frame in gif_reader:
        if frame.shape[-1] == 4:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
        else:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        if self.device == "cpu":
            proc = self._apply_to_video_cpu_frame(frame)
            processed_frames.append(proc)
        else:
            batch.append(frame)
            if len(batch) == batch_size:
                res_batch = self.processor.process_video_batch(batch)
                for idx, r in enumerate(res_batch):
                    proc = self._post_process_frame(r, batch[idx].shape[:2])
                    processed_frames.append(proc)
                batch.clear()

    if batch and self.device == "gpu":
        res_batch = self.processor.process_video_batch(batch)
        for idx, r in enumerate(res_batch):
            proc = self._post_process_frame(r, batch[idx].shape[:2])
            processed_frames.append(proc)

    fd, temp_path = tempfile.mkstemp(suffix=".gif")
    os.close(fd)

    self.logger.debug("Writing GIF frames to temporary file...")
    with imageio.get_writer(temp_path, format='GIF', duration=duration, loop=loop) as writer:
        for f in processed_frames:
            writer.append_data(f)

    elapsed = time.time() - start
    self.logger.info(f"GIF processing completed in {elapsed:.2f}s")

    return ProcessedGIF(temp_path, self.logger.getEffectiveLevel())

apply_to_image(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Applies the dithering process to a single image array.

Parameters:

Name Type Description Default
image NDArray[uint8]

The source image as a NumPy array (BGR format).

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The processed image as a NumPy array.

Source code in BayerDithering/dither.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def apply_to_image(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Applies the dithering process to a single image array.

    Args:
        image (NDArray[np.uint8]): The source image as a NumPy array (BGR format).

    Returns:
        NDArray[np.uint8]: The processed image as a NumPy array.
    """

    config: DitherConfig = self.config

    self.logger.info(
        f"Starting image processing | "
        f"device={self.device} | "
        f"matrix={config.b_matrix.shape} | "
        f"contrast={config.contrast} | "
        f"sharpness={config.sharpness} | "
        f"downscale={config.downscale_factor} | "
        f"upscale={config.upscale} | "
        f"filter={'yes' if config.filter else 'no'}"
    )

    size_mb: float = image.nbytes / (1024 * 1024)
    h, w = image.shape[:2]
    channels: int = 1 if len(image.shape) == 2 else image.shape[2]

    self.logger.info(f"Image info: "
        f"resolution={w}x{h} | "
        f"channels={channels} | "
        f"size={size_mb:.2f}MB | " 
        f"dtype={image.dtype}"
    )

    start: float = time.time()

    result: NDArray[np.uint8] = self.processor.process_frame(image)

    elapsed : float = time.time() - start
    self.logger.debug(f"Processing completed in {elapsed:.2f}s")

    return result

apply_to_video(video: cv2.VideoCapture) -> ProcessedVideo

Applies the dithering process to an entire video stream.

Parameters:

Name Type Description Default
video VideoCapture

An opened OpenCV VideoCapture object.

required

Returns:

Name Type Description
ProcessedVideo ProcessedVideo

A wrapper object containing the path to the processed temporary video file.

Source code in BayerDithering/dither.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def apply_to_video(self, video: cv2.VideoCapture) -> ProcessedVideo:
    """Applies the dithering process to an entire video stream.

    Args:
        video (cv2.VideoCapture): An opened OpenCV VideoCapture object.

    Returns:
        ProcessedVideo: A wrapper object containing the path to the processed temporary video file.
    """

    config: DitherConfig = self.config

    self.logger.info(
        f"Starting video processing | "
        f"device={self.device} | "
        f"matrix={config.b_matrix.shape} | "
        f"contrast={config.contrast} | "
        f"sharpness={config.sharpness} | "
        f"downscale={config.downscale_factor} | "
        f"upscale={config.upscale} | "
        f"filter={'yes' if config.filter else 'no'}"
    )

    if self.device == "gpu":
        result = self._apply_to_video_gpu(video)
    else:
        result = self._apply_to_video_cpu(video)

    return result

get_available_devices() -> list[str] staticmethod

Checks the system for supported processing hardware.

Returns:

Type Description
list[str]

list[str]: A list of available devices. Will always include 'cpu', and will include 'gpu' if Taichi detects a valid backend (CUDA, Vulkan, or Metal).

Source code in BayerDithering/dither.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
@staticmethod
def get_available_devices() -> list[str]:
    """Checks the system for supported processing hardware.

    Returns:
        list[str]: A list of available devices. Will always include 'cpu', 
                   and will include 'gpu' if Taichi detects a valid backend (CUDA, Vulkan, or Metal).
    """

    devices = ["cpu"]

    if not TAICHI_AVAILABLE:
        return devices


    # temporarily mute Taichi to prevent it from printing errors 
    # if the user doesn't have a GPU drivers installed.
    os.environ["TI_LOG_LEVEL"] = "ERROR"
    with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
        import taichi as ti
        try:
            # list of backend enums it can use on this machine
            archs = ti.supported_archs()

            # Check if there is any hardware accelerator beyond the CPU
            if any(arch in archs for arch in [ti.cuda, ti.vulkan, ti.metal, ti.opengl, ti.dx11]):
                devices.append("gpu")
        except Exception:
            pass

    return devices

Hardware Backends

The library delegates the heavy computational lifting to these MediaProcessor classes. The GPUProcessor leverages the Taichi framework for massive parallel execution, while the CPUProcessor uses optimized NumPy and OpenCV routines as a fallback.

BayerDithering.core.MediaProcessor

Bases: ABC

Base interface for all media processors (CPU and GPU).

Source code in BayerDithering/core.py
43
44
45
46
47
48
49
50
class MediaProcessor(ABC):
    """Base interface for all media processors (CPU and GPU)."""
    def __init__(self, config: DitherConfig) -> None:
        self.config = config

    @abstractmethod
    def process_frame(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        raise NotImplementedError

BayerDithering.CPUProcessor

Bases: MediaProcessor

Pure CPU implementation for Bayer Dithering.

Ideal for processing single images or when hardware acceleration (GPU) is unavailable.

Source code in BayerDithering/cpu.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class CPUProcessor(MediaProcessor):
    """Pure CPU implementation for Bayer Dithering.

    Ideal for processing single images or when hardware acceleration (GPU) is unavailable.
    """

    def __init__(self, config: DitherConfig) -> None:
        super().__init__(config)

        self.bayer_matrix_u8 = (self.config.b_matrix * 255).astype(np.uint8)

    @staticmethod
    def apply_color_filter(image: NDArray[np.uint8], colors: ColorFilter) -> NDArray[np.uint8]:
        """Applies a duotone filter to a binarized image.

        Args:
            image (NDArray[np.uint8]): Binarized grayscale image (values 0 and 255 only).
            colors (ColorFilter): A tuple containing (light_color_rgb, dark_color_rgb).

        Returns:
            NDArray[np.uint8]: Colored image in BGR format.
        """

        light, dark = colors  # (R,G,B), (R,G,B)

        # Convert RGB to BGR for OpenCV compatibility
        light = light[::-1]
        dark = dark[::-1]

        h, w = image.shape
        colored = np.zeros((h, w, 3), dtype=np.uint8)

        mask = image == 255  # white
        colored[mask] = light
        colored[~mask] = dark

        return colored

    @staticmethod
    def upscale(image: NDArray[np.uint8], target_size: tuple[int, int]) -> NDArray[np.uint8]:
        """Resizes the image to a target size using nearest-neighbor interpolation.

        Triggered when [DitherConfig.upscale][BayerDithering.core.DitherConfig] is set to True.

        Args:
            image (NDArray[np.uint8]): The processed, downscaled image.
            target_size (tuple[int, int]): The desired (width, height) to restore.

        Returns:
            NDArray[np.uint8]: The upscaled image.
        """

        return cv2.resize(image, target_size, interpolation=cv2.INTER_NEAREST)

    def downscale(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Reduces the image resolution based on the configured downscale factor.

        This uses the value defined in [DitherConfig.downscale_factor][BayerDithering.core.DitherConfig] 
        to divide the original dimensions, achieving a more pixelated retro look.

        Args:
            image (NDArray[np.uint8]): The original input image.

        Returns:
            NDArray[np.uint8]: The downscaled image.
        """

        height, width = image.shape[:2]

        new_width = max(1, width // self.config.downscale_factor)
        new_height = max(1, height // self.config.downscale_factor)
        new_size = (new_width, new_height)
        return cv2.resize(image, new_size, interpolation=cv2.INTER_NEAREST)

    def adjust_contrast(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Adjusts the contrast of an image based on its mean luminance.

        The intensity of the adjustment is controlled by [DitherConfig.contrast][BayerDithering.core.DitherConfig].

        Args:
            image (NDArray[np.uint8]): A grayscale image array.

        Returns:
            NDArray[np.uint8]: The contrast-adjusted grayscale image.
        """

        mean = np.mean(image)
        return np.clip(
            (image - mean) * self.config.contrast + mean,
            0, 255
        ).astype(np.uint8)

    def sharpen(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Applies an unsharp mask filter to enhance edges.

        The intensity of the sharpening is controlled by [DitherConfig.sharpness][BayerDithering.core.DitherConfig].

        Args:
            image (NDArray[np.uint8]): A grayscale image array.

        Returns:
            NDArray[np.uint8]: The sharpened grayscale image.
        """

        blurred = cv2.GaussianBlur(image, (0, 0), sigmaX=1.0)
        return cv2.addWeighted(image, 1 + self.config.sharpness, blurred, -self.config.sharpness, 0)

    def bayer_dither_array(self, image_array: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Applies the Bayer thresholding to a grayscale image array.

        It utilizes the pre-computed matrix defined in [DitherConfig.b_matrix][BayerDithering.core.DitherConfig], 
        tiling it across the entire image to determine pixel activation.

        Args:
            image_array (NDArray[np.uint8]): The pre-processed grayscale image.

        Returns:
            NDArray[np.uint8]: The binarized image (containing only 0 or 255 values).
        """

        matrix = self.bayer_matrix_u8

        h, w = image_array.shape
        m = matrix.shape[0]

        tiled_matrix = np.tile(matrix, (h // m + 1, w // m + 1))[:h, :w]

        return (image_array > tiled_matrix).astype(np.uint8) * 255

    def process_frame(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Executes the full CPU processing pipeline on a single frame.

        The pipeline order is: Grayscale Conversion -> Contrast -> Sharpening -> 
        Downscaling -> Bayer Dithering -> Color Filtering (Optional) -> Upscaling (Optional).

        Args:
            image (NDArray[np.uint8]): The raw input image in BGR format.

        Returns:
            NDArray[np.uint8]: The final processed and dithered image.

        Raises:
            ValueError: If the provided input image is None.
        """

        if image is None:
            raise ValueError("Invalid input image provided.")

        original_h, original_w = image.shape[:2]

        grayscale_img = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        img = self.adjust_contrast(image=grayscale_img)
        img = self.sharpen(image=img)
        img = self.downscale(image=img)
        img = self.bayer_dither_array(img)

        if self.config.filter is not None:
            img = self.apply_color_filter(img, self.config.filter)

        if self.config.upscale:
            img = self.upscale(image=img, target_size=(original_w, original_h))

        return img

adjust_contrast(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Adjusts the contrast of an image based on its mean luminance.

The intensity of the adjustment is controlled by DitherConfig.contrast.

Parameters:

Name Type Description Default
image NDArray[uint8]

A grayscale image array.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The contrast-adjusted grayscale image.

Source code in BayerDithering/cpu.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def adjust_contrast(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Adjusts the contrast of an image based on its mean luminance.

    The intensity of the adjustment is controlled by [DitherConfig.contrast][BayerDithering.core.DitherConfig].

    Args:
        image (NDArray[np.uint8]): A grayscale image array.

    Returns:
        NDArray[np.uint8]: The contrast-adjusted grayscale image.
    """

    mean = np.mean(image)
    return np.clip(
        (image - mean) * self.config.contrast + mean,
        0, 255
    ).astype(np.uint8)

apply_color_filter(image: NDArray[np.uint8], colors: ColorFilter) -> NDArray[np.uint8] staticmethod

Applies a duotone filter to a binarized image.

Parameters:

Name Type Description Default
image NDArray[uint8]

Binarized grayscale image (values 0 and 255 only).

required
colors ColorFilter

A tuple containing (light_color_rgb, dark_color_rgb).

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: Colored image in BGR format.

Source code in BayerDithering/cpu.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@staticmethod
def apply_color_filter(image: NDArray[np.uint8], colors: ColorFilter) -> NDArray[np.uint8]:
    """Applies a duotone filter to a binarized image.

    Args:
        image (NDArray[np.uint8]): Binarized grayscale image (values 0 and 255 only).
        colors (ColorFilter): A tuple containing (light_color_rgb, dark_color_rgb).

    Returns:
        NDArray[np.uint8]: Colored image in BGR format.
    """

    light, dark = colors  # (R,G,B), (R,G,B)

    # Convert RGB to BGR for OpenCV compatibility
    light = light[::-1]
    dark = dark[::-1]

    h, w = image.shape
    colored = np.zeros((h, w, 3), dtype=np.uint8)

    mask = image == 255  # white
    colored[mask] = light
    colored[~mask] = dark

    return colored

bayer_dither_array(image_array: NDArray[np.uint8]) -> NDArray[np.uint8]

Applies the Bayer thresholding to a grayscale image array.

It utilizes the pre-computed matrix defined in DitherConfig.b_matrix, tiling it across the entire image to determine pixel activation.

Parameters:

Name Type Description Default
image_array NDArray[uint8]

The pre-processed grayscale image.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The binarized image (containing only 0 or 255 values).

Source code in BayerDithering/cpu.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def bayer_dither_array(self, image_array: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Applies the Bayer thresholding to a grayscale image array.

    It utilizes the pre-computed matrix defined in [DitherConfig.b_matrix][BayerDithering.core.DitherConfig], 
    tiling it across the entire image to determine pixel activation.

    Args:
        image_array (NDArray[np.uint8]): The pre-processed grayscale image.

    Returns:
        NDArray[np.uint8]: The binarized image (containing only 0 or 255 values).
    """

    matrix = self.bayer_matrix_u8

    h, w = image_array.shape
    m = matrix.shape[0]

    tiled_matrix = np.tile(matrix, (h // m + 1, w // m + 1))[:h, :w]

    return (image_array > tiled_matrix).astype(np.uint8) * 255

downscale(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Reduces the image resolution based on the configured downscale factor.

This uses the value defined in DitherConfig.downscale_factor to divide the original dimensions, achieving a more pixelated retro look.

Parameters:

Name Type Description Default
image NDArray[uint8]

The original input image.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The downscaled image.

Source code in BayerDithering/cpu.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def downscale(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Reduces the image resolution based on the configured downscale factor.

    This uses the value defined in [DitherConfig.downscale_factor][BayerDithering.core.DitherConfig] 
    to divide the original dimensions, achieving a more pixelated retro look.

    Args:
        image (NDArray[np.uint8]): The original input image.

    Returns:
        NDArray[np.uint8]: The downscaled image.
    """

    height, width = image.shape[:2]

    new_width = max(1, width // self.config.downscale_factor)
    new_height = max(1, height // self.config.downscale_factor)
    new_size = (new_width, new_height)
    return cv2.resize(image, new_size, interpolation=cv2.INTER_NEAREST)

process_frame(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Executes the full CPU processing pipeline on a single frame.

The pipeline order is: Grayscale Conversion -> Contrast -> Sharpening -> Downscaling -> Bayer Dithering -> Color Filtering (Optional) -> Upscaling (Optional).

Parameters:

Name Type Description Default
image NDArray[uint8]

The raw input image in BGR format.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The final processed and dithered image.

Raises:

Type Description
ValueError

If the provided input image is None.

Source code in BayerDithering/cpu.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def process_frame(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Executes the full CPU processing pipeline on a single frame.

    The pipeline order is: Grayscale Conversion -> Contrast -> Sharpening -> 
    Downscaling -> Bayer Dithering -> Color Filtering (Optional) -> Upscaling (Optional).

    Args:
        image (NDArray[np.uint8]): The raw input image in BGR format.

    Returns:
        NDArray[np.uint8]: The final processed and dithered image.

    Raises:
        ValueError: If the provided input image is None.
    """

    if image is None:
        raise ValueError("Invalid input image provided.")

    original_h, original_w = image.shape[:2]

    grayscale_img = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    img = self.adjust_contrast(image=grayscale_img)
    img = self.sharpen(image=img)
    img = self.downscale(image=img)
    img = self.bayer_dither_array(img)

    if self.config.filter is not None:
        img = self.apply_color_filter(img, self.config.filter)

    if self.config.upscale:
        img = self.upscale(image=img, target_size=(original_w, original_h))

    return img

sharpen(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Applies an unsharp mask filter to enhance edges.

The intensity of the sharpening is controlled by DitherConfig.sharpness.

Parameters:

Name Type Description Default
image NDArray[uint8]

A grayscale image array.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The sharpened grayscale image.

Source code in BayerDithering/cpu.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def sharpen(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Applies an unsharp mask filter to enhance edges.

    The intensity of the sharpening is controlled by [DitherConfig.sharpness][BayerDithering.core.DitherConfig].

    Args:
        image (NDArray[np.uint8]): A grayscale image array.

    Returns:
        NDArray[np.uint8]: The sharpened grayscale image.
    """

    blurred = cv2.GaussianBlur(image, (0, 0), sigmaX=1.0)
    return cv2.addWeighted(image, 1 + self.config.sharpness, blurred, -self.config.sharpness, 0)

upscale(image: NDArray[np.uint8], target_size: tuple[int, int]) -> NDArray[np.uint8] staticmethod

Resizes the image to a target size using nearest-neighbor interpolation.

Triggered when DitherConfig.upscale is set to True.

Parameters:

Name Type Description Default
image NDArray[uint8]

The processed, downscaled image.

required
target_size tuple[int, int]

The desired (width, height) to restore.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: The upscaled image.

Source code in BayerDithering/cpu.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@staticmethod
def upscale(image: NDArray[np.uint8], target_size: tuple[int, int]) -> NDArray[np.uint8]:
    """Resizes the image to a target size using nearest-neighbor interpolation.

    Triggered when [DitherConfig.upscale][BayerDithering.core.DitherConfig] is set to True.

    Args:
        image (NDArray[np.uint8]): The processed, downscaled image.
        target_size (tuple[int, int]): The desired (width, height) to restore.

    Returns:
        NDArray[np.uint8]: The upscaled image.
    """

    return cv2.resize(image, target_size, interpolation=cv2.INTER_NEAREST)

BayerDithering.GPUProcessor

Bases: MediaProcessor

GPU-accelerated implementation for Bayer Dithering using Taichi.

Highly recommended for video processing due to its massive parallelization capabilities.

Source code in BayerDithering/gpu.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
@ti.data_oriented
class GPUProcessor(MediaProcessor):
    """GPU-accelerated implementation for Bayer Dithering using Taichi.

    Highly recommended for video processing due to its massive parallelization capabilities.
    """

    def __init__(self, config: DitherConfig) -> None:
        if not TAICHI_AVAILABLE:
            raise ImportError("Taichi backend is not installed. Install with: pip install BayerDithering[gpu]")

        super().__init__(config)

        with contextlib.redirect_stdout(io.StringIO()):
            ti.init(arch=ti.gpu, verbose=False)

        self.batch_size = None
        self.height = None
        self.width = None
        self.out_height = None
        self.out_width = None

        self.frames = None
        self.out_frames = None

        self.bayer_size = self.config.b_matrix.shape
        self.bayer_h, self.bayer_w = self.bayer_size

        self.bayer = ti.field(dtype=ti.f32, shape=self.bayer_size)
        self.bayer.from_numpy(config.b_matrix.astype(np.float32))

    def _set_frames_buffer(self, shape: tuple[int, int], new_shape: tuple[int, int], batch_size: int = 64) -> None:
        """Initializes or resizes the Taichi ndarrays used for GPU memory buffering.

        Args:
            shape (tuple[int, int]): Original (height, width) of the frames.
            new_shape (tuple[int, int]): Target (height, width) after downscaling.
            batch_size (int, optional): Number of frames to process in a single batch. Defaults to 64.
        """

        h, w = shape
        h1, w1 = new_shape

        self.batch_size = batch_size
        self.height = h
        self.width = w
        self.out_height = h1
        self.out_width = w1

        self.frames = ti.ndarray(dtype=ti.u8, shape=(batch_size, h, w, 3))
        self.out_frames = ti.ndarray(dtype=ti.u8, shape=(batch_size, h1, w1, 3))

    @ti.func
    def get_gray(self, frames: ti.template(), n: ti.i32, y: ti.i32, x: ti.i32) -> ti.f32: # type: ignore
        """Taichi sub-function to extract and convert a specific BGR pixel to grayscale luminance.

        Args:
            frames (ti.template()): The Taichi field or ndarray containing the image batch.
            n (ti.i32): The index of the frame within the batch.
            y (ti.i32): The vertical coordinate (row) of the pixel.
            x (ti.i32): The horizontal coordinate (column) of the pixel.

        Returns:
            ti.f32: The calculated grayscale luminance value (0.0 to 255.0).

        """

        b = ti.cast(frames[n, y, x, 0], ti.f32)
        g = ti.cast(frames[n, y, x, 1], ti.f32)
        r = ti.cast(frames[n, y, x, 2], ti.f32)
        return 0.299 * r + 0.587 * g + 0.114 * b

    @ti.kernel
    def compute_batch_mean(
        self,
        frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
        batch_len: ti.i32,
        src_h: ti.i32,
        src_w: ti.i32
    ) -> ti.f32:
        """Parallel reduction kernel to compute the mean luminance of an entire video batch.

        Args:
            frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The batch of input frames.
            batch_len (ti.i32): The number of frames currently in the batch.
            src_h (ti.i32): The height of the source frames.
            src_w (ti.i32): The width of the source frames.

        Returns:
            ti.f32: The overall mean grayscale value of the batch.
        """

        sum_gray = 0.0
        for n, y, x in ti.ndrange(batch_len, src_h, src_w):
            sum_gray += self.get_gray(frames, n, y, x)

        total_pixels = ti.cast(batch_len * src_h * src_w, ti.f32)
        return sum_gray / total_pixels

    @ti.kernel
    def process_video_batch_kernel(
        self,
        frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
        out_frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
        batch_len: ti.i32,
        src_h: ti.i32,
        src_w: ti.i32,
        out_h: ti.i32,
        out_w: ti.i32,
        downscale: ti.i32,
        contrast: ti.f32,
        mean_val: ti.f32,
        sharpness: ti.f32,
        use_filter: ti.i32,
        r0: ti.u8, g0: ti.u8, b0: ti.u8,
        r1: ti.u8, g1: ti.u8, b1: ti.u8
    ):
        """Core GPU kernel that applies unsharp masking, contrast adjustment, and Bayer dithering to a batch of frames.

        Args:
            frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The input batch of frames in BGR format.
            out_frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The output buffer where processed frames are written.
            batch_len (ti.i32): The number of frames currently in the batch.
            src_h (ti.i32): The original height of the source frames.
            src_w (ti.i32): The original width of the source frames.
            out_h (ti.i32): The target height of the output frames (after downscaling).
            out_w (ti.i32): The target width of the output frames (after downscaling).
            downscale (ti.i32): The factor by which the image is scaled down.
            contrast (ti.f32): The contrast multiplier to apply.
            mean_val (ti.f32): The mean luminance of the batch used as a pivot for contrast adjustment.
            sharpness (ti.f32): The intensity of the unsharp mask filter.
            use_filter (ti.i32): Flag indicating whether to apply a custom color palette (1 for True, 0 for False).
            r0 (ti.u8): Red channel of the dark color palette.
            g0 (ti.u8): Green channel of the dark color palette.
            b0 (ti.u8): Blue channel of the dark color palette.
            r1 (ti.u8): Red channel of the light color palette.
            g1 (ti.u8): Green channel of the light color palette.
            b1 (ti.u8): Blue channel of the light color palette.
        """

        for n, y, x in ti.ndrange(batch_len, out_h, out_w):
            src_x = x * downscale
            src_y = y * downscale

            # Sharpness via 3x3 Convolution (reading neighboring pixels)
            x_left = ti.max(0, src_x - 1)
            x_right = ti.min(src_w - 1, src_x + 1)
            y_up = ti.max(0, src_y - 1)
            y_down = ti.min(src_h - 1, src_y + 1)

            center = self.get_gray(frames, n, src_y, src_x)
            up = self.get_gray(frames, n, y_up, src_x)
            down = self.get_gray(frames, n, y_down, src_x)
            left = self.get_gray(frames, n, src_y, x_left)
            right = self.get_gray(frames, n, src_y, x_right)

            # Apply Laplacian matrix to enhance edge sharpness
            gray = center + sharpness * (4.0 * center - up - down - left - right)

            # Contrast
            gray = (gray - mean_val) * contrast + mean_val

            # Clamp (constrain mathematical results to the visible color range of 0 to 255)
            gray = ti.max(0.0, ti.min(255.0, gray))

            # Bayer Dither
            by = y % self.bayer_h
            bx = x % self.bayer_w
            threshold = self.bayer[by, bx] * 255.0
            val = 255 if gray > threshold else 0

            if use_filter == 1:
                if val == 0:
                    out_frames[n, y, x, 0] = b0
                    out_frames[n, y, x, 1] = g0
                    out_frames[n, y, x, 2] = r0
                else:
                    out_frames[n, y, x, 0] = b1
                    out_frames[n, y, x, 1] = g1
                    out_frames[n, y, x, 2] = r1
            else:
                val_u8 = ti.cast(val, ti.u8)
                out_frames[n, y, x, 0] = val_u8
                out_frames[n, y, x, 1] = val_u8
                out_frames[n, y, x, 2] = val_u8

    def process_video_batch(self, batch: list[NDArray[np.uint8]]) -> NDArray[np.uint8]:
        """Manages the memory transfer and execution of the GPU kernel for a batch of frames.

        Args:
            batch (list[NDArray[np.uint8]]): A list of raw BGR frames.

        Returns:
            NDArray[np.uint8]: A numpy array containing the processed frames.
        """

        if not batch:
            return []

        original_h, original_w = batch[0].shape[:2]
        current_batch_len = len(batch)

        downscaled_w = max(1, original_w // self.config.downscale_factor)
        downscaled_h = max(1, original_h // self.config.downscale_factor)

        if (self.frames is None or 
            self.height != original_h or 
            self.width != original_w or 
            self.batch_size < current_batch_len):

            self._set_frames_buffer(
                shape=(original_h, original_w),
                new_shape=(downscaled_h, downscaled_w),
                batch_size=current_batch_len
            )

        batch_np = np.ascontiguousarray(np.stack(batch), dtype=np.uint8)

        temp = np.zeros((self.batch_size, self.height, self.width, 3), dtype=np.uint8)
        temp[:current_batch_len] = batch_np

        self.frames.from_numpy(temp)

        r0 = g0 = b0 = r1 = g1 = b1 = 0
        use_filter = 0

        if self.config.filter:
            (r1, g1, b1), (r0, g0, b0) = self.config.filter
            use_filter = 1

        mean_val = self.compute_batch_mean(
            self.frames, current_batch_len, self.height, self.width
        )

        self.process_video_batch_kernel(
            self.frames,
            self.out_frames,
            current_batch_len,
            self.height,
            self.width,
            self.out_height,
            self.out_width,
            self.config.downscale_factor,
            self.config.contrast,
            mean_val,
            self.config.sharpness,
            use_filter,
            r0, g0, b0,
            r1, g1, b1
        )

        result = self.out_frames.to_numpy()
        return result[:current_batch_len]

    def process_frame(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
        """Executes the GPU pipeline for a single frame.

        Args:
            image (NDArray[np.uint8]): Input image in BGR format.

        Returns:
            NDArray[np.uint8]: Processed dithered image.
        """

        original_h, original_w = image.shape[:2]

        downscaled_w = max(1, original_w // self.config.downscale_factor)
        downscaled_h = max(1, original_h // self.config.downscale_factor)

        if (self.frames is None or 
            self.height != original_h or 
            self.width != original_w or 
            self.batch_size != 1):

            self._set_frames_buffer(
                shape=(original_h, original_w),
                new_shape=(downscaled_h, downscaled_w),
                batch_size=1
            )

        if len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)

        temp = np.zeros((1, self.height, self.width, 3), dtype=np.uint8)
        temp[0] = np.ascontiguousarray(image)
        self.frames.from_numpy(temp)

        r0 = g0 = b0 = r1 = g1 = b1 = 0
        use_filter = 0
        if self.config.filter:
            (r1, g1, b1), (r0, g0, b0) = self.config.filter
            use_filter = 1

        mean_val = self.compute_batch_mean(self.frames, 1, self.height, self.width)

        self.process_video_batch_kernel(
            self.frames,
            self.out_frames,
            1,
            self.height,
            self.width,
            self.out_height,
            self.out_width,
            self.config.downscale_factor,
            self.config.contrast,
            mean_val,
            self.config.sharpness,
            use_filter,
            r0, g0, b0,
            r1, g1, b1
        )

        result = self.out_frames.to_numpy()[0]

        if self.config.upscale:
            result = cv2.resize(
                result, 
                (original_w, original_h), 
                interpolation=cv2.INTER_NEAREST
            )

        return result

compute_batch_mean(frames: ti.types.ndarray(dtype=(ti.u8), ndim=4), batch_len: ti.i32, src_h: ti.i32, src_w: ti.i32) -> ti.f32

Parallel reduction kernel to compute the mean luminance of an entire video batch.

Parameters:

Name Type Description Default
frames ndarray(dtype=u8, ndim=4)

The batch of input frames.

required
batch_len i32

The number of frames currently in the batch.

required
src_h i32

The height of the source frames.

required
src_w i32

The width of the source frames.

required

Returns:

Type Description
f32

ti.f32: The overall mean grayscale value of the batch.

Source code in BayerDithering/gpu.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@ti.kernel
def compute_batch_mean(
    self,
    frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
    batch_len: ti.i32,
    src_h: ti.i32,
    src_w: ti.i32
) -> ti.f32:
    """Parallel reduction kernel to compute the mean luminance of an entire video batch.

    Args:
        frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The batch of input frames.
        batch_len (ti.i32): The number of frames currently in the batch.
        src_h (ti.i32): The height of the source frames.
        src_w (ti.i32): The width of the source frames.

    Returns:
        ti.f32: The overall mean grayscale value of the batch.
    """

    sum_gray = 0.0
    for n, y, x in ti.ndrange(batch_len, src_h, src_w):
        sum_gray += self.get_gray(frames, n, y, x)

    total_pixels = ti.cast(batch_len * src_h * src_w, ti.f32)
    return sum_gray / total_pixels

get_gray(frames: ti.template(), n: ti.i32, y: ti.i32, x: ti.i32) -> ti.f32

Taichi sub-function to extract and convert a specific BGR pixel to grayscale luminance.

Parameters:

Name Type Description Default
frames template()

The Taichi field or ndarray containing the image batch.

required
n i32

The index of the frame within the batch.

required
y i32

The vertical coordinate (row) of the pixel.

required
x i32

The horizontal coordinate (column) of the pixel.

required

Returns:

Type Description
f32

ti.f32: The calculated grayscale luminance value (0.0 to 255.0).

Source code in BayerDithering/gpu.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@ti.func
def get_gray(self, frames: ti.template(), n: ti.i32, y: ti.i32, x: ti.i32) -> ti.f32: # type: ignore
    """Taichi sub-function to extract and convert a specific BGR pixel to grayscale luminance.

    Args:
        frames (ti.template()): The Taichi field or ndarray containing the image batch.
        n (ti.i32): The index of the frame within the batch.
        y (ti.i32): The vertical coordinate (row) of the pixel.
        x (ti.i32): The horizontal coordinate (column) of the pixel.

    Returns:
        ti.f32: The calculated grayscale luminance value (0.0 to 255.0).

    """

    b = ti.cast(frames[n, y, x, 0], ti.f32)
    g = ti.cast(frames[n, y, x, 1], ti.f32)
    r = ti.cast(frames[n, y, x, 2], ti.f32)
    return 0.299 * r + 0.587 * g + 0.114 * b

process_frame(image: NDArray[np.uint8]) -> NDArray[np.uint8]

Executes the GPU pipeline for a single frame.

Parameters:

Name Type Description Default
image NDArray[uint8]

Input image in BGR format.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: Processed dithered image.

Source code in BayerDithering/gpu.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def process_frame(self, image: NDArray[np.uint8]) -> NDArray[np.uint8]:
    """Executes the GPU pipeline for a single frame.

    Args:
        image (NDArray[np.uint8]): Input image in BGR format.

    Returns:
        NDArray[np.uint8]: Processed dithered image.
    """

    original_h, original_w = image.shape[:2]

    downscaled_w = max(1, original_w // self.config.downscale_factor)
    downscaled_h = max(1, original_h // self.config.downscale_factor)

    if (self.frames is None or 
        self.height != original_h or 
        self.width != original_w or 
        self.batch_size != 1):

        self._set_frames_buffer(
            shape=(original_h, original_w),
            new_shape=(downscaled_h, downscaled_w),
            batch_size=1
        )

    if len(image.shape) == 2:
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)

    temp = np.zeros((1, self.height, self.width, 3), dtype=np.uint8)
    temp[0] = np.ascontiguousarray(image)
    self.frames.from_numpy(temp)

    r0 = g0 = b0 = r1 = g1 = b1 = 0
    use_filter = 0
    if self.config.filter:
        (r1, g1, b1), (r0, g0, b0) = self.config.filter
        use_filter = 1

    mean_val = self.compute_batch_mean(self.frames, 1, self.height, self.width)

    self.process_video_batch_kernel(
        self.frames,
        self.out_frames,
        1,
        self.height,
        self.width,
        self.out_height,
        self.out_width,
        self.config.downscale_factor,
        self.config.contrast,
        mean_val,
        self.config.sharpness,
        use_filter,
        r0, g0, b0,
        r1, g1, b1
    )

    result = self.out_frames.to_numpy()[0]

    if self.config.upscale:
        result = cv2.resize(
            result, 
            (original_w, original_h), 
            interpolation=cv2.INTER_NEAREST
        )

    return result

process_video_batch(batch: list[NDArray[np.uint8]]) -> NDArray[np.uint8]

Manages the memory transfer and execution of the GPU kernel for a batch of frames.

Parameters:

Name Type Description Default
batch list[NDArray[uint8]]

A list of raw BGR frames.

required

Returns:

Type Description
NDArray[uint8]

NDArray[np.uint8]: A numpy array containing the processed frames.

Source code in BayerDithering/gpu.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def process_video_batch(self, batch: list[NDArray[np.uint8]]) -> NDArray[np.uint8]:
    """Manages the memory transfer and execution of the GPU kernel for a batch of frames.

    Args:
        batch (list[NDArray[np.uint8]]): A list of raw BGR frames.

    Returns:
        NDArray[np.uint8]: A numpy array containing the processed frames.
    """

    if not batch:
        return []

    original_h, original_w = batch[0].shape[:2]
    current_batch_len = len(batch)

    downscaled_w = max(1, original_w // self.config.downscale_factor)
    downscaled_h = max(1, original_h // self.config.downscale_factor)

    if (self.frames is None or 
        self.height != original_h or 
        self.width != original_w or 
        self.batch_size < current_batch_len):

        self._set_frames_buffer(
            shape=(original_h, original_w),
            new_shape=(downscaled_h, downscaled_w),
            batch_size=current_batch_len
        )

    batch_np = np.ascontiguousarray(np.stack(batch), dtype=np.uint8)

    temp = np.zeros((self.batch_size, self.height, self.width, 3), dtype=np.uint8)
    temp[:current_batch_len] = batch_np

    self.frames.from_numpy(temp)

    r0 = g0 = b0 = r1 = g1 = b1 = 0
    use_filter = 0

    if self.config.filter:
        (r1, g1, b1), (r0, g0, b0) = self.config.filter
        use_filter = 1

    mean_val = self.compute_batch_mean(
        self.frames, current_batch_len, self.height, self.width
    )

    self.process_video_batch_kernel(
        self.frames,
        self.out_frames,
        current_batch_len,
        self.height,
        self.width,
        self.out_height,
        self.out_width,
        self.config.downscale_factor,
        self.config.contrast,
        mean_val,
        self.config.sharpness,
        use_filter,
        r0, g0, b0,
        r1, g1, b1
    )

    result = self.out_frames.to_numpy()
    return result[:current_batch_len]

process_video_batch_kernel(frames: ti.types.ndarray(dtype=(ti.u8), ndim=4), out_frames: ti.types.ndarray(dtype=(ti.u8), ndim=4), batch_len: ti.i32, src_h: ti.i32, src_w: ti.i32, out_h: ti.i32, out_w: ti.i32, downscale: ti.i32, contrast: ti.f32, mean_val: ti.f32, sharpness: ti.f32, use_filter: ti.i32, r0: ti.u8, g0: ti.u8, b0: ti.u8, r1: ti.u8, g1: ti.u8, b1: ti.u8)

Core GPU kernel that applies unsharp masking, contrast adjustment, and Bayer dithering to a batch of frames.

Parameters:

Name Type Description Default
frames ndarray(dtype=u8, ndim=4)

The input batch of frames in BGR format.

required
out_frames ndarray(dtype=u8, ndim=4)

The output buffer where processed frames are written.

required
batch_len i32

The number of frames currently in the batch.

required
src_h i32

The original height of the source frames.

required
src_w i32

The original width of the source frames.

required
out_h i32

The target height of the output frames (after downscaling).

required
out_w i32

The target width of the output frames (after downscaling).

required
downscale i32

The factor by which the image is scaled down.

required
contrast f32

The contrast multiplier to apply.

required
mean_val f32

The mean luminance of the batch used as a pivot for contrast adjustment.

required
sharpness f32

The intensity of the unsharp mask filter.

required
use_filter i32

Flag indicating whether to apply a custom color palette (1 for True, 0 for False).

required
r0 u8

Red channel of the dark color palette.

required
g0 u8

Green channel of the dark color palette.

required
b0 u8

Blue channel of the dark color palette.

required
r1 u8

Red channel of the light color palette.

required
g1 u8

Green channel of the light color palette.

required
b1 u8

Blue channel of the light color palette.

required
Source code in BayerDithering/gpu.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@ti.kernel
def process_video_batch_kernel(
    self,
    frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
    out_frames: ti.types.ndarray(dtype=ti.u8, ndim=4), # type: ignore
    batch_len: ti.i32,
    src_h: ti.i32,
    src_w: ti.i32,
    out_h: ti.i32,
    out_w: ti.i32,
    downscale: ti.i32,
    contrast: ti.f32,
    mean_val: ti.f32,
    sharpness: ti.f32,
    use_filter: ti.i32,
    r0: ti.u8, g0: ti.u8, b0: ti.u8,
    r1: ti.u8, g1: ti.u8, b1: ti.u8
):
    """Core GPU kernel that applies unsharp masking, contrast adjustment, and Bayer dithering to a batch of frames.

    Args:
        frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The input batch of frames in BGR format.
        out_frames (ti.types.ndarray(dtype=ti.u8, ndim=4)): The output buffer where processed frames are written.
        batch_len (ti.i32): The number of frames currently in the batch.
        src_h (ti.i32): The original height of the source frames.
        src_w (ti.i32): The original width of the source frames.
        out_h (ti.i32): The target height of the output frames (after downscaling).
        out_w (ti.i32): The target width of the output frames (after downscaling).
        downscale (ti.i32): The factor by which the image is scaled down.
        contrast (ti.f32): The contrast multiplier to apply.
        mean_val (ti.f32): The mean luminance of the batch used as a pivot for contrast adjustment.
        sharpness (ti.f32): The intensity of the unsharp mask filter.
        use_filter (ti.i32): Flag indicating whether to apply a custom color palette (1 for True, 0 for False).
        r0 (ti.u8): Red channel of the dark color palette.
        g0 (ti.u8): Green channel of the dark color palette.
        b0 (ti.u8): Blue channel of the dark color palette.
        r1 (ti.u8): Red channel of the light color palette.
        g1 (ti.u8): Green channel of the light color palette.
        b1 (ti.u8): Blue channel of the light color palette.
    """

    for n, y, x in ti.ndrange(batch_len, out_h, out_w):
        src_x = x * downscale
        src_y = y * downscale

        # Sharpness via 3x3 Convolution (reading neighboring pixels)
        x_left = ti.max(0, src_x - 1)
        x_right = ti.min(src_w - 1, src_x + 1)
        y_up = ti.max(0, src_y - 1)
        y_down = ti.min(src_h - 1, src_y + 1)

        center = self.get_gray(frames, n, src_y, src_x)
        up = self.get_gray(frames, n, y_up, src_x)
        down = self.get_gray(frames, n, y_down, src_x)
        left = self.get_gray(frames, n, src_y, x_left)
        right = self.get_gray(frames, n, src_y, x_right)

        # Apply Laplacian matrix to enhance edge sharpness
        gray = center + sharpness * (4.0 * center - up - down - left - right)

        # Contrast
        gray = (gray - mean_val) * contrast + mean_val

        # Clamp (constrain mathematical results to the visible color range of 0 to 255)
        gray = ti.max(0.0, ti.min(255.0, gray))

        # Bayer Dither
        by = y % self.bayer_h
        bx = x % self.bayer_w
        threshold = self.bayer[by, bx] * 255.0
        val = 255 if gray > threshold else 0

        if use_filter == 1:
            if val == 0:
                out_frames[n, y, x, 0] = b0
                out_frames[n, y, x, 1] = g0
                out_frames[n, y, x, 2] = r0
            else:
                out_frames[n, y, x, 0] = b1
                out_frames[n, y, x, 1] = g1
                out_frames[n, y, x, 2] = r1
        else:
            val_u8 = ti.cast(val, ti.u8)
            out_frames[n, y, x, 0] = val_u8
            out_frames[n, y, x, 1] = val_u8
            out_frames[n, y, x, 2] = val_u8

Utilities & Media Wrappers

Helper classes used internally to securely manage temporary file streams, merge audio, and ensure proper cleanup from the disk after the processing is finished.

BayerDithering.utils.ProcessedVideo

Wrapper for a processed video saved in a temporary file.

Provides utility methods to save the final video, merge the original audio, and safely clean up the temporary files from the disk using a context manager.

Source code in BayerDithering/utils.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class ProcessedVideo:
    """Wrapper for a processed video saved in a temporary file.

    Provides utility methods to save the final video, merge the original audio, 
    and safely clean up the temporary files from the disk using a context manager.
    """

    def __init__(self, path: str, log_level: int) -> None:
        """Initializes the ProcessedVideo wrapper.

        Args:
            path (str): The absolute path to the temporary processed video file.
            log_level (int): The logging level inherited from the main orchestrator.
        """

        self.logger = logging.getLogger(f"{__name__}.ProcessedVideo")
        self.logger.setLevel(level=log_level)

        self.path = path
        self._cap = None

    def open(self) -> cv2.VideoCapture:
        """Opens the processed video file for reading.

        Returns:
            cv2.VideoCapture: An OpenCV VideoCapture object pointing to the processed video.
        """

        if self._cap is None:
            self._cap = cv2.VideoCapture(self.path)
        return self._cap

    def save(self, path: str) -> None:
        """Saves the processed video to the specified destination path without audio.

        Args:
            path (str): The destination file path (e.g., 'output.mp4').
        """

        self.logger.debug(f"Saving video to {path} ...")
        shutil.copy(self.path, path)
        os.chmod(path, 0o644)

    def save_with_audio(self, original_video_path: str, path: str) -> None:
        """Merges the audio track from the original video into the processed video.

        Requires FFmpeg to be installed and available in the system's PATH.

        Args:
            original_video_path (str): The path to the source video containing the audio track.
            path (str): The destination file path for the final merged video.

        Raises:
            RuntimeError: If the FFmpeg executable is not found in the system PATH.
            ValueError: If the original_video_path is empty.
            FileNotFoundError: If the original_video_path does not exist on the disk.
        """

        if shutil.which("ffmpeg") is None:
            raise RuntimeError(
                "FFmpeg executable not found in PATH. "
                "Please install it from https://ffmpeg.org/download.html "
                "and ensure it is available in your system PATH."
            )
        if not original_video_path:
            raise ValueError("original_video_path is required to merge audio")
        if not os.path.exists(original_video_path):
            raise FileNotFoundError(f"original_video_path={original_video_path} does not exist")

        self.logger.debug(f"Saving video with audio to {path}...")
        self.logger.debug(f"Merging audio from {original_video_path} into {self.path}")

        fd, temp_path = tempfile.mkstemp(suffix=".mp4")
        os.close(fd)

        cmd = [
            "ffmpeg",
            "-y",
            "-i", self.path,             # processed video (video stream)
            "-i", original_video_path,   # original video (audio stream)
            "-c:v", "copy",              # do not re-encode video
            "-c:a", "aac",               # encode audio to AAC
            "-map", "0:v:0",             # take video from first input
            "-map", "1:a:0?",            # take audio from second input (if it exists)
            temp_path
        ]

        try:
            subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
            shutil.copy2(temp_path, path)
            os.chmod(path, 0o644)
        finally:
            if os.path.exists(temp_path):
                os.remove(temp_path)

        self.logger.debug(f"Audio merged successfully into {path}")

    def release(self) -> None:
        """Closes the video capture and deletes the temporary video file from the disk."""

        if self._cap is not None:
            self._cap.release()
            self._cap = None

        if self.path and os.path.exists(self.path):
            self.logger.debug(f"Removing temporary file {self.path} ...")
            os.remove(self.path)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        self.release()

__init__(path: str, log_level: int) -> None

Initializes the ProcessedVideo wrapper.

Parameters:

Name Type Description Default
path str

The absolute path to the temporary processed video file.

required
log_level int

The logging level inherited from the main orchestrator.

required
Source code in BayerDithering/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, path: str, log_level: int) -> None:
    """Initializes the ProcessedVideo wrapper.

    Args:
        path (str): The absolute path to the temporary processed video file.
        log_level (int): The logging level inherited from the main orchestrator.
    """

    self.logger = logging.getLogger(f"{__name__}.ProcessedVideo")
    self.logger.setLevel(level=log_level)

    self.path = path
    self._cap = None

open() -> cv2.VideoCapture

Opens the processed video file for reading.

Returns:

Type Description
VideoCapture

cv2.VideoCapture: An OpenCV VideoCapture object pointing to the processed video.

Source code in BayerDithering/utils.py
36
37
38
39
40
41
42
43
44
45
def open(self) -> cv2.VideoCapture:
    """Opens the processed video file for reading.

    Returns:
        cv2.VideoCapture: An OpenCV VideoCapture object pointing to the processed video.
    """

    if self._cap is None:
        self._cap = cv2.VideoCapture(self.path)
    return self._cap

release() -> None

Closes the video capture and deletes the temporary video file from the disk.

Source code in BayerDithering/utils.py
112
113
114
115
116
117
118
119
120
121
def release(self) -> None:
    """Closes the video capture and deletes the temporary video file from the disk."""

    if self._cap is not None:
        self._cap.release()
        self._cap = None

    if self.path and os.path.exists(self.path):
        self.logger.debug(f"Removing temporary file {self.path} ...")
        os.remove(self.path)

save(path: str) -> None

Saves the processed video to the specified destination path without audio.

Parameters:

Name Type Description Default
path str

The destination file path (e.g., 'output.mp4').

required
Source code in BayerDithering/utils.py
47
48
49
50
51
52
53
54
55
56
def save(self, path: str) -> None:
    """Saves the processed video to the specified destination path without audio.

    Args:
        path (str): The destination file path (e.g., 'output.mp4').
    """

    self.logger.debug(f"Saving video to {path} ...")
    shutil.copy(self.path, path)
    os.chmod(path, 0o644)

save_with_audio(original_video_path: str, path: str) -> None

Merges the audio track from the original video into the processed video.

Requires FFmpeg to be installed and available in the system's PATH.

Parameters:

Name Type Description Default
original_video_path str

The path to the source video containing the audio track.

required
path str

The destination file path for the final merged video.

required

Raises:

Type Description
RuntimeError

If the FFmpeg executable is not found in the system PATH.

ValueError

If the original_video_path is empty.

FileNotFoundError

If the original_video_path does not exist on the disk.

Source code in BayerDithering/utils.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def save_with_audio(self, original_video_path: str, path: str) -> None:
    """Merges the audio track from the original video into the processed video.

    Requires FFmpeg to be installed and available in the system's PATH.

    Args:
        original_video_path (str): The path to the source video containing the audio track.
        path (str): The destination file path for the final merged video.

    Raises:
        RuntimeError: If the FFmpeg executable is not found in the system PATH.
        ValueError: If the original_video_path is empty.
        FileNotFoundError: If the original_video_path does not exist on the disk.
    """

    if shutil.which("ffmpeg") is None:
        raise RuntimeError(
            "FFmpeg executable not found in PATH. "
            "Please install it from https://ffmpeg.org/download.html "
            "and ensure it is available in your system PATH."
        )
    if not original_video_path:
        raise ValueError("original_video_path is required to merge audio")
    if not os.path.exists(original_video_path):
        raise FileNotFoundError(f"original_video_path={original_video_path} does not exist")

    self.logger.debug(f"Saving video with audio to {path}...")
    self.logger.debug(f"Merging audio from {original_video_path} into {self.path}")

    fd, temp_path = tempfile.mkstemp(suffix=".mp4")
    os.close(fd)

    cmd = [
        "ffmpeg",
        "-y",
        "-i", self.path,             # processed video (video stream)
        "-i", original_video_path,   # original video (audio stream)
        "-c:v", "copy",              # do not re-encode video
        "-c:a", "aac",               # encode audio to AAC
        "-map", "0:v:0",             # take video from first input
        "-map", "1:a:0?",            # take audio from second input (if it exists)
        temp_path
    ]

    try:
        subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
        shutil.copy2(temp_path, path)
        os.chmod(path, 0o644)
    finally:
        if os.path.exists(temp_path):
            os.remove(temp_path)

    self.logger.debug(f"Audio merged successfully into {path}")

BayerDithering.utils.ProcessedGIF

Wrapper for a processed animated GIF saved in a temporary file.

Source code in BayerDithering/utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class ProcessedGIF:
    """Wrapper for a processed animated GIF saved in a temporary file."""

    def __init__(self, path: str, log_level: int) -> None:
        self.logger = logging.getLogger(f"{__name__}.ProcessedGIF")
        self.logger.setLevel(level=log_level)
        self.path = path

    def save(self, dest_path: str) -> None:
        """Saves the processed GIF to the specified destination path."""
        self.logger.debug(f"Saving GIF to {dest_path} ...")
        shutil.copy(self.path, dest_path)
        os.chmod(dest_path, 0o644)

    def release(self) -> None:
        """Deletes the temporary GIF file from the disk."""
        if self.path and os.path.exists(self.path):
            self.logger.debug(f"Removing temporary file {self.path} ...")
            os.remove(self.path)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        self.release()

release() -> None

Deletes the temporary GIF file from the disk.

Source code in BayerDithering/utils.py
143
144
145
146
147
def release(self) -> None:
    """Deletes the temporary GIF file from the disk."""
    if self.path and os.path.exists(self.path):
        self.logger.debug(f"Removing temporary file {self.path} ...")
        os.remove(self.path)

save(dest_path: str) -> None

Saves the processed GIF to the specified destination path.

Source code in BayerDithering/utils.py
137
138
139
140
141
def save(self, dest_path: str) -> None:
    """Saves the processed GIF to the specified destination path."""
    self.logger.debug(f"Saving GIF to {dest_path} ...")
    shutil.copy(self.path, dest_path)
    os.chmod(dest_path, 0o644)

BayerDithering.utils.load_filters() -> dict[str:ColorFilter]

Source code in BayerDithering/utils.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def load_filters() -> dict[str: ColorFilter]:

    base_dir = Path(__file__).parent 
    toml_path = base_dir / 'filters.toml'

    default_filters = {
        "Orange": ((252, 176, 32), (10, 6, 3)),
        "Capuccino": ((200, 185, 150), (61, 49, 40)),
        "Brat": ((137, 205, 0), (0, 0, 0)),
        "Fairy": ((174, 255, 223), (90, 84, 117)),
        "Bloody": ((255, 42, 0), (43, 12, 0)),
        "Lavender": ((196, 167, 231), (35, 33, 54)),
        "Cyan": ((0, 204, 255), (0, 34, 43)),
        "Vapor": ((250, 185, 253), (75, 123, 222)),
        "Matrix": ((0, 255, 0), (0, 39, 6)),
        "ObraDinn": ((229, 255, 254), (51, 51, 25))
    }

    try:
        with open(toml_path, 'rb') as file:
            filters = tomllib.load(file)
        return filters

    except FileNotFoundError:
        print("[FileNotFoundError] The 'filters.toml' file was not found. Using default filter pack...")
        return default_filters

    except tomllib.TOMLDecodeError as e:
        print(f"[TOMLDecodeError] Error parsing the TOML file: {e}\n Using default filter pack...")
        return default_filters

Matrix Generation & Presets

Here you can find the dynamic matrix generator and the pre-computed matrix presets available for immediate use.

BayerDithering.matrices = {'2x2': bayer_matrix_2x2, '4x4': bayer_matrix_4x4, '8x8': bayer_matrix_8x8} module-attribute

A dictionary containing pre-computed, normalized Bayer matrices for quick access.

Available keys: - '2x2' - '4x4' - '8x8'

BayerDithering.generate_bayer_matrix(order: int) -> NDArray[np.float32]

Generates a Bayer matrix of size (2^order x 2^order).

Parameters:

Name Type Description Default
order int

The order of the matrix (e.g., 1 for 2x2, 2 for 4x4, 3 for 8x8).

required

Returns:

Type Description
NDArray[float32]

NDArray[np.float32]: The normalized Bayer matrix.

Source code in BayerDithering/matrix.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def generate_bayer_matrix(order: int) -> NDArray[np.float32]:
    """Generates a Bayer matrix of size (2^order x 2^order).

    Args:
        order (int): The order of the matrix (e.g., 1 for 2x2, 2 for 4x4, 3 for 8x8).

    Returns:
        NDArray[np.float32]: The normalized Bayer matrix.
    """

    matrix = np.array([[0, 2], 
                       [3, 1]], dtype=np.float32)

    for _ in range(order - 1):
        # M_2n = [[4*Mn, 4*Mn + 2], [4*Mn + 3, 4*Mn + 1]]
        top = np.hstack([4 * matrix, 4 * matrix + 2])
        bot = np.hstack([4 * matrix + 3, 4 * matrix + 1])
        matrix = np.vstack([top, bot])

    return matrix / (matrix.max() + 1)