diff --git a/Cargo.lock b/Cargo.lock index 3fae5df6ee..500501d5d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1680,6 +1680,7 @@ dependencies = [ "daft-csv", "daft-dsl", "daft-functions", + "daft-image", "daft-io", "daft-json", "daft-local-execution", @@ -1717,7 +1718,6 @@ version = "0.3.0-dev0" dependencies = [ "aho-corasick", "arrow2", - "base64 0.22.1", "bincode", "chrono", "chrono-tz", @@ -1735,7 +1735,6 @@ dependencies = [ "fnv", "html-escape", "hyperloglog", - "image", "indexmap 2.3.0", "itertools 0.11.0", "jaq-core", @@ -1829,6 +1828,7 @@ dependencies = [ "common-io-config", "daft-core", "daft-dsl", + "daft-image", "daft-io", "futures", "pyo3", @@ -1840,6 +1840,20 @@ dependencies = [ "uuid 1.10.0", ] +[[package]] +name = "daft-image" +version = "0.3.0-dev0" +dependencies = [ + "arrow2", + "base64 0.22.1", + "common-error", + "daft-core", + "image", + "log", + "num-traits", + "pyo3", +] + [[package]] name = "daft-io" version = "0.3.0-dev0" @@ -2173,6 +2187,7 @@ dependencies = [ "common-error", "daft-core", "daft-dsl", + "daft-image", "html-escape", "num-traits", "pyo3", diff --git a/Cargo.toml b/Cargo.toml index 33061a5ed4..9390c5c4db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ daft-core = {path = "src/daft-core", default-features = false} daft-csv = {path = "src/daft-csv", default-features = false} daft-dsl = {path = "src/daft-dsl", default-features = false} daft-functions = {path = "src/daft-functions", default-features = false} +daft-image = {path = "src/daft-image", default-features = false} daft-io = {path = "src/daft-io", default-features = false} daft-json = {path = "src/daft-json", default-features = false} daft-local-execution = {path = "src/daft-local-execution", default-features = false} @@ -41,6 +42,7 @@ python = [ "daft-dsl/python", "daft-local-execution/python", "daft-io/python", + "daft-image/python", "daft-json/python", "daft-micropartition/python", "daft-parquet/python", @@ -114,6 +116,7 @@ members = [ "src/daft-core", "src/daft-local-execution", "src/daft-io", + "src/daft-image", "src/daft-parquet", "src/daft-csv", "src/daft-json", diff --git a/daft/daft.pyi b/daft/daft/__init__.pyi similarity index 99% rename from daft/daft.pyi rename to daft/daft/__init__.pyi index 5d740ff51b..f78e2ae166 100644 --- a/daft/daft.pyi +++ b/daft/daft/__init__.pyi @@ -1385,10 +1385,6 @@ class PySeries: def list_slice(self, start: PySeries, end: PySeries | None = None) -> PySeries: ... def list_sort(self, desc: PySeries) -> PySeries: ... def map_get(self, key: PySeries) -> PySeries: ... - def image_decode(self, raise_error_on_failure: bool, mode: ImageMode | None = None) -> PySeries: ... - def image_encode(self, image_format: ImageFormat) -> PySeries: ... - def image_resize(self, w: int, h: int) -> PySeries: ... - def image_to_mode(self, mode: ImageMode) -> PySeries: ... def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ... def is_null(self) -> PySeries: ... def not_null(self) -> PySeries: ... diff --git a/daft/daft/image.pyi b/daft/daft/image.pyi new file mode 100644 index 0000000000..db92958aa8 --- /dev/null +++ b/daft/daft/image.pyi @@ -0,0 +1,6 @@ +from daft.daft import ImageFormat, ImageMode, PySeries + +def decode(s: PySeries, raise_error_on_failure: bool, mode: ImageMode | None = None) -> PySeries: ... +def encode(s: PySeries, image_format: ImageFormat) -> PySeries: ... +def resize(s: PySeries, w: int, h: int) -> PySeries: ... +def to_mode(s: PySeries, mode: ImageMode) -> PySeries: ... diff --git a/daft/series.py b/daft/series.py index 82875b21cf..25780472da 100644 --- a/daft/series.py +++ b/daft/series.py @@ -5,7 +5,7 @@ import pyarrow as pa from daft.arrow_utils import ensure_array, ensure_chunked_array -from daft.daft import CountMode, ImageFormat, ImageMode, PySeries +from daft.daft import CountMode, ImageFormat, ImageMode, PySeries, image from daft.datatype import DataType from daft.utils import pyarrow_supports_fixed_shape_tensor @@ -994,14 +994,14 @@ def decode( mode = ImageMode.from_mode_string(mode.upper()) if not isinstance(mode, ImageMode): raise ValueError(f"mode must be a string or ImageMode variant, but got: {mode}") - return Series._from_pyseries(self._series.image_decode(raise_error_on_failure=raise_on_error, mode=mode)) + return Series._from_pyseries(image.decode(self._series, raise_error_on_failure=raise_on_error, mode=mode)) def encode(self, image_format: str | ImageFormat) -> Series: if isinstance(image_format, str): image_format = ImageFormat.from_format_string(image_format.upper()) if not isinstance(image_format, ImageFormat): raise ValueError(f"image_format must be a string or ImageFormat variant, but got: {image_format}") - return Series._from_pyseries(self._series.image_encode(image_format)) + return Series._from_pyseries(image.encode(self._series, image_format)) def resize(self, w: int, h: int) -> Series: if not isinstance(w, int): @@ -1009,11 +1009,11 @@ def resize(self, w: int, h: int) -> Series: if not isinstance(h, int): raise TypeError(f"expected int for h but got {type(h)}") - return Series._from_pyseries(self._series.image_resize(w, h)) + return Series._from_pyseries(image.resize(self._series, w, h)) def to_mode(self, mode: str | ImageMode) -> Series: if isinstance(mode, str): mode = ImageMode.from_mode_string(mode.upper()) if not isinstance(mode, ImageMode): raise ValueError(f"mode must be a string or ImageMode variant, but got: {mode}") - return Series._from_pyseries(self._series.image_to_mode(mode)) + return Series._from_pyseries(image.to_mode(self._series, mode)) diff --git a/src/common/display/src/table_display.rs b/src/common/display/src/table_display.rs index a30a0423ea..8f0ba51d1a 100644 --- a/src/common/display/src/table_display.rs +++ b/src/common/display/src/table_display.rs @@ -2,6 +2,14 @@ pub use comfy_table; const BOLD_TABLE_HEADERS_IN_DISPLAY: &str = "DAFT_BOLD_TABLE_HEADERS"; +pub trait StrValue { + fn str_value(&self, idx: usize) -> String; +} + +pub trait HTMLValue { + fn html_value(&self, idx: usize) -> String; +} + // this should be factored out to a common crate fn create_table_cell(value: &str) -> comfy_table::Cell { let mut attributes = vec![]; @@ -43,10 +51,6 @@ pub fn make_schema_vertical_table( table } -pub trait StrValue { - fn str_value(&self, idx: usize) -> String; -} - pub fn make_comfy_table>( fields: &[S], columns: Option<&[&dyn StrValue]>, diff --git a/src/daft-core/Cargo.toml b/src/daft-core/Cargo.toml index 6a13f6a9e2..e6f6cb0929 100644 --- a/src/daft-core/Cargo.toml +++ b/src/daft-core/Cargo.toml @@ -15,7 +15,6 @@ arrow2 = {workspace = true, features = [ "compute_substring", "io_ipc" ]} -base64 = "0.22.0" bincode = {workspace = true} chrono = {workspace = true} chrono-tz = {workspace = true} @@ -51,11 +50,6 @@ serde_json = {workspace = true} sketches-ddsketch = {workspace = true} unicode-normalization = "0.1.23" -[dependencies.image] -default-features = false -features = ["gif", "jpeg", "ico", "png", "tiff", "webp", "bmp", "hdr"] -version = "0.24.7" - [dependencies.numpy] optional = true version = "0.19" diff --git a/src/daft-core/src/array/image_array.rs b/src/daft-core/src/array/image_array.rs new file mode 100644 index 0000000000..7422760380 --- /dev/null +++ b/src/daft-core/src/array/image_array.rs @@ -0,0 +1,166 @@ +use std::vec; + +use common_error::DaftResult; + +use crate::array::prelude::*; +use crate::datatypes::prelude::*; + +use crate::series::{IntoSeries, Series}; + +#[derive(Clone)] +pub struct BBox(pub u32, pub u32, pub u32, pub u32); + +impl BBox { + pub fn from_u32_arrow_array(arr: &dyn arrow2::array::Array) -> Self { + assert!(arr.len() == 4); + let mut iter = arr + .as_any() + .downcast_ref::() + .unwrap() + .iter(); + BBox( + *iter.next().unwrap().unwrap(), + *iter.next().unwrap().unwrap(), + *iter.next().unwrap().unwrap(), + *iter.next().unwrap().unwrap(), + ) + } +} + +pub struct ImageArraySidecarData { + pub channels: Vec, + pub heights: Vec, + pub widths: Vec, + pub modes: Vec, + pub validity: Option, +} + +impl ImageArray { + pub const IMAGE_DATA_IDX: usize = 0; + pub const IMAGE_CHANNEL_IDX: usize = 1; + pub const IMAGE_HEIGHT_IDX: usize = 2; + pub const IMAGE_WIDTH_IDX: usize = 3; + pub const IMAGE_MODE_IDX: usize = 4; + + pub fn image_mode(&self) -> &Option { + match self.data_type() { + DataType::Image(mode) => mode, + _ => panic!("Expected dtype to be Image"), + } + } + + pub fn data_array(&self) -> &ListArray { + let array = self.physical.children.get(Self::IMAGE_DATA_IDX).unwrap(); + array.list().unwrap() + } + + pub fn channel_array(&self) -> &arrow2::array::UInt16Array { + let array = self.physical.children.get(Self::IMAGE_CHANNEL_IDX).unwrap(); + array.u16().unwrap().as_arrow() + } + + pub fn height_array(&self) -> &arrow2::array::UInt32Array { + let array = self.physical.children.get(Self::IMAGE_HEIGHT_IDX).unwrap(); + array.u32().unwrap().as_arrow() + } + + pub fn width_array(&self) -> &arrow2::array::UInt32Array { + let array = self.physical.children.get(Self::IMAGE_WIDTH_IDX).unwrap(); + array.u32().unwrap().as_arrow() + } + + pub fn mode_array(&self) -> &arrow2::array::UInt8Array { + let array = self.physical.children.get(Self::IMAGE_MODE_IDX).unwrap(); + array.u8().unwrap().as_arrow() + } + + pub fn from_list_array( + name: &str, + data_type: DataType, + data_array: ListArray, + sidecar_data: ImageArraySidecarData, + ) -> DaftResult { + let values: Vec = vec![ + data_array.into_series().rename("data"), + UInt16Array::from(( + "channel", + Box::new( + arrow2::array::UInt16Array::from_vec(sidecar_data.channels) + .with_validity(sidecar_data.validity.clone()), + ), + )) + .into_series(), + UInt32Array::from(( + "height", + Box::new( + arrow2::array::UInt32Array::from_vec(sidecar_data.heights) + .with_validity(sidecar_data.validity.clone()), + ), + )) + .into_series(), + UInt32Array::from(( + "width", + Box::new( + arrow2::array::UInt32Array::from_vec(sidecar_data.widths) + .with_validity(sidecar_data.validity.clone()), + ), + )) + .into_series(), + UInt8Array::from(( + "mode", + Box::new( + arrow2::array::UInt8Array::from_vec(sidecar_data.modes) + .with_validity(sidecar_data.validity.clone()), + ), + )) + .into_series(), + ]; + let physical_type = data_type.to_physical(); + let struct_array = StructArray::new( + Field::new(name, physical_type), + values, + sidecar_data.validity, + ); + Ok(ImageArray::new(Field::new(name, data_type), struct_array)) + } + + pub fn from_vecs( + name: &str, + data_type: DataType, + data: Vec, + offsets: Vec, + sidecar_data: ImageArraySidecarData, + ) -> DaftResult { + if data.is_empty() { + return Ok(ImageArray::full_null(name, &data_type, offsets.len() - 1)); + } + let offsets = arrow2::offset::OffsetsBuffer::try_from(offsets)?; + let arrow_dtype: arrow2::datatypes::DataType = T::PRIMITIVE.into(); + if let DataType::Image(Some(mode)) = &data_type { + if mode.get_dtype().to_arrow()? != arrow_dtype { + panic!("Inner value dtype of provided dtype {data_type:?} is inconsistent with inferred value dtype {arrow_dtype:?}"); + } + } + let data_array = ListArray::new( + Field::new("data", DataType::List(Box::new((&arrow_dtype).into()))), + Series::try_from(( + "data", + Box::new(arrow2::array::PrimitiveArray::from_vec(data)) + as Box, + ))?, + offsets, + sidecar_data.validity.clone(), + ); + + Self::from_list_array(name, data_type, data_array, sidecar_data) + } +} + +impl FixedShapeImageArray { + pub fn image_mode(&self) -> &ImageMode { + match self.data_type() { + DataType::FixedShapeImage(mode, _, _) => mode, + other => panic!("Expected dtype to be Image, got {other:?}"), + } + } +} diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index f56faeebd9..76726e5f20 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -1,6 +1,7 @@ mod fixed_size_list_array; pub mod from; pub mod growable; +pub mod image_array; pub mod iterator; mod list_array; pub mod ops; diff --git a/src/daft-core/src/array/ops/cast.rs b/src/daft-core/src/array/ops/cast.rs index a8f8d46bdb..83b4ee9298 100644 --- a/src/daft-core/src/array/ops/cast.rs +++ b/src/daft-core/src/array/ops/cast.rs @@ -4,7 +4,8 @@ use super::as_arrow::AsArrow; use crate::{ array::{ growable::make_growable, - ops::{from_arrow::FromArrow, full::FullNull, image::ImageArraySidecarData}, + image_array::ImageArraySidecarData, + ops::{from_arrow::FromArrow, full::FullNull}, DataArray, FixedSizeListArray, ListArray, StructArray, }, datatypes::{ diff --git a/src/daft-core/src/array/ops/image.rs b/src/daft-core/src/array/ops/image.rs deleted file mode 100644 index 0e6316cfb1..0000000000 --- a/src/daft-core/src/array/ops/image.rs +++ /dev/null @@ -1,1100 +0,0 @@ -use std::borrow::Cow; -use std::io::{Seek, SeekFrom, Write}; -use std::sync::Arc; -use std::vec; - -use image::{ColorType, DynamicImage, ImageBuffer}; - -use crate::array::prelude::*; -use crate::datatypes::prelude::*; - -use crate::series::{IntoSeries, Series}; -use common_error::{DaftError, DaftResult}; -use image::{Luma, LumaA, Rgb, Rgba}; - -use super::full::FullNull; -use super::{as_arrow::AsArrow, from_arrow::FromArrow}; -use num_traits::FromPrimitive; - -use std::ops::Deref; - -#[derive(Clone)] -pub struct BBox(u32, u32, u32, u32); - -impl BBox { - pub fn from_u32_arrow_array(arr: &dyn arrow2::array::Array) -> Self { - assert!(arr.len() == 4); - let mut iter = arr - .as_any() - .downcast_ref::() - .unwrap() - .iter(); - BBox( - *iter.next().unwrap().unwrap(), - *iter.next().unwrap().unwrap(), - *iter.next().unwrap().unwrap(), - *iter.next().unwrap().unwrap(), - ) - } -} - -#[allow(clippy::upper_case_acronyms, dead_code)] -#[derive(Debug)] -pub enum DaftImageBuffer<'a> { - L(ImageBuffer, Cow<'a, [u8]>>), - LA(ImageBuffer, Cow<'a, [u8]>>), - RGB(ImageBuffer, Cow<'a, [u8]>>), - RGBA(ImageBuffer, Cow<'a, [u8]>>), - L16(ImageBuffer, Cow<'a, [u16]>>), - LA16(ImageBuffer, Cow<'a, [u16]>>), - RGB16(ImageBuffer, Cow<'a, [u16]>>), - RGBA16(ImageBuffer, Cow<'a, [u16]>>), - RGB32F(ImageBuffer, Cow<'a, [f32]>>), - RGBA32F(ImageBuffer, Cow<'a, [f32]>>), -} - -macro_rules! with_method_on_image_buffer { - ( - $key_type:expr, $method: ident -) => {{ - match $key_type { - DaftImageBuffer::L(img) => img.$method(), - DaftImageBuffer::LA(img) => img.$method(), - DaftImageBuffer::RGB(img) => img.$method(), - DaftImageBuffer::RGBA(img) => img.$method(), - DaftImageBuffer::L16(img) => img.$method(), - DaftImageBuffer::LA16(img) => img.$method(), - DaftImageBuffer::RGB16(img) => img.$method(), - DaftImageBuffer::RGBA16(img) => img.$method(), - DaftImageBuffer::RGB32F(img) => img.$method(), - DaftImageBuffer::RGBA32F(img) => img.$method(), - } - }}; -} - -type IOResult = std::result::Result; - -/// A wrapper of a writer that tracks the number of bytes successfully written. -pub struct CountingWriter { - inner: W, - count: u64, -} - -impl CountingWriter { - /// The number of bytes successful written so far. - pub fn count(&self) -> u64 { - self.count - } - - /// Extracts the inner writer, discarding this wrapper. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl From for CountingWriter { - fn from(inner: W) -> Self { - Self { inner, count: 0 } - } -} - -impl Write for CountingWriter { - fn write(&mut self, buf: &[u8]) -> IOResult { - let written = self.inner.write(buf)?; - self.count += written as u64; - Ok(written) - } - - fn flush(&mut self) -> IOResult { - self.inner.flush() - } -} - -impl Seek for CountingWriter { - fn seek(&mut self, pos: SeekFrom) -> IOResult { - self.inner.seek(pos) - } -} - -struct Wrap(T); - -impl From for Wrap { - fn from(image_format: image::ImageFormat) -> Self { - Wrap(match image_format { - image::ImageFormat::Png => ImageFormat::PNG, - image::ImageFormat::Jpeg => ImageFormat::JPEG, - image::ImageFormat::Tiff => ImageFormat::TIFF, - image::ImageFormat::Gif => ImageFormat::GIF, - image::ImageFormat::Bmp => ImageFormat::BMP, - _ => unimplemented!("Image format {:?} is not supported", image_format), - }) - } -} - -impl From> for image::ImageFormat { - fn from(image_format: Wrap) -> Self { - match image_format.0 { - ImageFormat::PNG => image::ImageFormat::Png, - ImageFormat::JPEG => image::ImageFormat::Jpeg, - ImageFormat::TIFF => image::ImageFormat::Tiff, - ImageFormat::GIF => image::ImageFormat::Gif, - ImageFormat::BMP => image::ImageFormat::Bmp, - } - } -} - -impl From> for image::ColorType { - fn from(image_mode: Wrap) -> image::ColorType { - use image::ColorType; - match image_mode.0 { - ImageMode::L => ColorType::L8, - ImageMode::LA => ColorType::La8, - ImageMode::RGB => ColorType::Rgb8, - ImageMode::RGBA => ColorType::Rgba8, - ImageMode::L16 => ColorType::L16, - ImageMode::LA16 => ColorType::La16, - ImageMode::RGB16 => ColorType::Rgb16, - ImageMode::RGBA16 => ColorType::Rgba16, - ImageMode::RGB32F => ColorType::Rgb32F, - ImageMode::RGBA32F => ColorType::Rgba32F, - } - } -} - -impl TryFrom for Wrap { - type Error = DaftError; - - fn try_from(color: image::ColorType) -> DaftResult { - use image::ColorType; - Ok(Wrap(match color { - ColorType::L8 => Ok(ImageMode::L), - ColorType::La8 => Ok(ImageMode::LA), - ColorType::Rgb8 => Ok(ImageMode::RGB), - ColorType::Rgba8 => Ok(ImageMode::RGBA), - ColorType::L16 => Ok(ImageMode::L16), - ColorType::La16 => Ok(ImageMode::LA16), - ColorType::Rgb16 => Ok(ImageMode::RGB16), - ColorType::Rgba16 => Ok(ImageMode::RGBA16), - ColorType::Rgb32F => Ok(ImageMode::RGB32F), - ColorType::Rgba32F => Ok(ImageMode::RGBA32F), - _ => Err(DaftError::ValueError(format!( - "Color type {:?} is not supported.", - color - ))), - }?)) - } -} - -impl<'a> DaftImageBuffer<'a> { - pub fn height(&self) -> u32 { - with_method_on_image_buffer!(self, height) - } - - pub fn width(&self) -> u32 { - with_method_on_image_buffer!(self, width) - } - - pub fn as_u8_slice(&'a self) -> &'a [u8] { - match self { - DaftImageBuffer::L(img) => img.as_raw(), - DaftImageBuffer::LA(img) => img.as_raw(), - DaftImageBuffer::RGB(img) => img.as_raw(), - DaftImageBuffer::RGBA(img) => img.as_raw(), - _ => unimplemented!("unimplemented {self:?}"), - } - } - - pub fn color(&self) -> ColorType { - Wrap(self.mode()).into() - } - - pub fn mode(&self) -> ImageMode { - match self { - DaftImageBuffer::L(..) => ImageMode::L, - DaftImageBuffer::LA(..) => ImageMode::LA, - DaftImageBuffer::RGB(..) => ImageMode::RGB, - DaftImageBuffer::RGBA(..) => ImageMode::RGBA, - DaftImageBuffer::L16(..) => ImageMode::L16, - DaftImageBuffer::LA16(..) => ImageMode::LA16, - DaftImageBuffer::RGB16(..) => ImageMode::RGB16, - DaftImageBuffer::RGBA16(..) => ImageMode::RGBA16, - DaftImageBuffer::RGB32F(..) => ImageMode::RGB32F, - DaftImageBuffer::RGBA32F(..) => ImageMode::RGBA32F, - } - } - - pub fn decode(bytes: &[u8]) -> DaftResult { - image::load_from_memory(bytes) - .map(|v| v.into()) - .map_err(|e| DaftError::ValueError(format!("Decoding image from bytes failed: {}", e))) - } - - pub fn encode(&self, image_format: ImageFormat, writer: &mut W) -> DaftResult<()> - where - W: Write + Seek, - { - image::write_buffer_with_format( - writer, - self.as_u8_slice(), - self.width(), - self.height(), - self.color(), - image::ImageFormat::from(Wrap(image_format)), - ) - .map_err(|e| { - DaftError::ValueError(format!( - "Encoding image into file format {} failed: {}", - image_format, e - )) - }) - } - - pub fn fit_to(&self, w: u32, h: u32) -> Self { - // Preserving aspect ratio, resize an image to fit within the specified dimensions. - let scale_factor = { - let width_scale = w as f64 / self.width() as f64; - let height_scale = h as f64 / self.height() as f64; - width_scale.min(height_scale) - }; - let new_w = self.width() as f64 * scale_factor; - let new_h = self.height() as f64 * scale_factor; - - self.resize(new_w.floor() as u32, new_h.floor() as u32) - } - - pub fn resize(&self, w: u32, h: u32) -> Self { - match self { - DaftImageBuffer::L(imgbuf) => { - let result = - image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); - DaftImageBuffer::L(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::LA(imgbuf) => { - let result = - image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); - DaftImageBuffer::LA(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::RGB(imgbuf) => { - let result = - image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); - DaftImageBuffer::RGB(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::RGBA(imgbuf) => { - let result = - image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); - DaftImageBuffer::RGBA(image_buffer_vec_to_cow(result)) - } - _ => unimplemented!("Mode {self:?} not implemented"), - } - } - - pub fn crop(&self, bbox: &BBox) -> Self { - // HACK(jay): The `.to_image()` method on SubImage takes in `'static` references for some reason - // This hack will ensure that `&self` adheres to that overly prescriptive bound - let inner = - unsafe { std::mem::transmute::<&DaftImageBuffer<'a>, &DaftImageBuffer<'static>>(self) }; - match inner { - DaftImageBuffer::L(imgbuf) => { - let result = - image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); - DaftImageBuffer::L(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::LA(imgbuf) => { - let result = - image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); - DaftImageBuffer::LA(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::RGB(imgbuf) => { - let result = - image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); - DaftImageBuffer::RGB(image_buffer_vec_to_cow(result)) - } - DaftImageBuffer::RGBA(imgbuf) => { - let result = - image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); - DaftImageBuffer::RGBA(image_buffer_vec_to_cow(result)) - } - _ => unimplemented!("Mode {self:?} not implemented"), - } - } - - pub fn into_mode(self, mode: ImageMode) -> Self { - let img: DynamicImage = self.into(); - // I couldn't find a method from the image crate to do this - let img: DynamicImage = match mode { - ImageMode::L => img.into_luma8().into(), - ImageMode::LA => img.into_luma_alpha8().into(), - ImageMode::RGB => img.into_rgb8().into(), - ImageMode::RGBA => img.into_rgba8().into(), - ImageMode::L16 => img.into_luma16().into(), - ImageMode::LA16 => img.into_luma_alpha16().into(), - ImageMode::RGB16 => img.into_rgb16().into(), - ImageMode::RGBA16 => img.into_rgba16().into(), - ImageMode::RGB32F => img.into_rgb32f().into(), - ImageMode::RGBA32F => img.into_rgba32f().into(), - }; - img.into() - } -} - -fn image_buffer_vec_to_cow<'a, P, T>(input: ImageBuffer>) -> ImageBuffer> -where - P: image::Pixel, - Vec: Deref, - T: ToOwned + std::clone::Clone, - [T]: ToOwned, -{ - let h = input.height(); - let w = input.width(); - let owned: Cow<[T]> = input.into_raw().into(); - ImageBuffer::from_raw(w, h, owned).unwrap() -} - -fn image_buffer_cow_to_vec(input: ImageBuffer>) -> ImageBuffer> -where - P: image::Pixel, - Vec: Deref, - T: ToOwned + std::clone::Clone, - [T]: ToOwned, -{ - let h = input.height(); - let w = input.width(); - let owned: Vec = input.into_raw().to_vec(); - ImageBuffer::from_raw(w, h, owned).unwrap() -} - -impl<'a> From for DaftImageBuffer<'a> { - fn from(dyn_img: DynamicImage) -> Self { - match dyn_img { - DynamicImage::ImageLuma8(img_buf) => { - DaftImageBuffer::<'a>::L(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageLumaA8(img_buf) => { - DaftImageBuffer::<'a>::LA(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgb8(img_buf) => { - DaftImageBuffer::<'a>::RGB(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgba8(img_buf) => { - DaftImageBuffer::<'a>::RGBA(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageLuma16(img_buf) => { - DaftImageBuffer::<'a>::L16(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageLumaA16(img_buf) => { - DaftImageBuffer::<'a>::LA16(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgb16(img_buf) => { - DaftImageBuffer::<'a>::RGB16(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgba16(img_buf) => { - DaftImageBuffer::<'a>::RGBA16(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgb32F(img_buf) => { - DaftImageBuffer::<'a>::RGB32F(image_buffer_vec_to_cow(img_buf)) - } - DynamicImage::ImageRgba32F(img_buf) => { - DaftImageBuffer::<'a>::RGBA32F(image_buffer_vec_to_cow(img_buf)) - } - _ => unimplemented!("{dyn_img:?} not implemented"), - } - } -} - -impl<'a> From> for DynamicImage { - fn from(daft_buf: DaftImageBuffer<'a>) -> Self { - match daft_buf { - DaftImageBuffer::L(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::LA(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGB(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGBA(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::L16(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::LA16(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGB16(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGBA16(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGB32F(buf) => image_buffer_cow_to_vec(buf).into(), - DaftImageBuffer::RGBA32F(buf) => image_buffer_cow_to_vec(buf).into(), - } - } -} - -pub struct ImageArraySidecarData { - pub channels: Vec, - pub heights: Vec, - pub widths: Vec, - pub modes: Vec, - pub validity: Option, -} - -pub trait AsImageObj { - fn name(&self) -> &str; - fn len(&self) -> usize; - fn as_image_obj(&self, idx: usize) -> Option>; -} - -pub struct ImageBufferIter<'a, Arr> -where - Arr: AsImageObj, -{ - cursor: usize, - image_array: &'a Arr, -} - -impl<'a, Arr> ImageBufferIter<'a, Arr> -where - Arr: AsImageObj, -{ - pub fn new(image_array: &'a Arr) -> Self { - Self { - cursor: 0usize, - image_array, - } - } -} - -impl<'a, Arr> Iterator for ImageBufferIter<'a, Arr> -where - Arr: AsImageObj, -{ - type Item = Option>; - - fn next(&mut self) -> Option { - if self.cursor >= self.image_array.len() { - None - } else { - let image_obj = self.image_array.as_image_obj(self.cursor); - self.cursor += 1; - Some(image_obj) - } - } -} - -impl ImageArray { - pub fn image_mode(&self) -> &Option { - match self.data_type() { - DataType::Image(mode) => mode, - _ => panic!("Expected dtype to be Image"), - } - } - - pub fn data_array(&self) -> &ListArray { - const IMAGE_DATA_IDX: usize = 0; - let array = self.physical.children.get(IMAGE_DATA_IDX).unwrap(); - array.list().unwrap() - } - - pub fn channel_array(&self) -> &arrow2::array::UInt16Array { - const IMAGE_CHANNEL_IDX: usize = 1; - let array = self.physical.children.get(IMAGE_CHANNEL_IDX).unwrap(); - array.u16().unwrap().as_arrow() - } - - pub fn height_array(&self) -> &arrow2::array::UInt32Array { - const IMAGE_HEIGHT_IDX: usize = 2; - let array = self.physical.children.get(IMAGE_HEIGHT_IDX).unwrap(); - array.u32().unwrap().as_arrow() - } - - pub fn width_array(&self) -> &arrow2::array::UInt32Array { - const IMAGE_WIDTH_IDX: usize = 3; - let array = self.physical.children.get(IMAGE_WIDTH_IDX).unwrap(); - array.u32().unwrap().as_arrow() - } - - pub fn mode_array(&self) -> &arrow2::array::UInt8Array { - const IMAGE_MODE_IDX: usize = 4; - let array = self.physical.children.get(IMAGE_MODE_IDX).unwrap(); - array.u8().unwrap().as_arrow() - } - - pub fn from_vecs( - name: &str, - data_type: DataType, - data: Vec, - offsets: Vec, - sidecar_data: ImageArraySidecarData, - ) -> DaftResult { - if data.is_empty() { - return Ok(ImageArray::full_null(name, &data_type, offsets.len() - 1)); - } - let offsets = arrow2::offset::OffsetsBuffer::try_from(offsets)?; - let arrow_dtype: arrow2::datatypes::DataType = T::PRIMITIVE.into(); - if let DataType::Image(Some(mode)) = &data_type { - if mode.get_dtype().to_arrow()? != arrow_dtype { - panic!("Inner value dtype of provided dtype {data_type:?} is inconsistent with inferred value dtype {arrow_dtype:?}"); - } - } - let data_array = ListArray::new( - Field::new("data", DataType::List(Box::new((&arrow_dtype).into()))), - Series::try_from(( - "data", - Box::new(arrow2::array::PrimitiveArray::from_vec(data)) - as Box, - ))?, - offsets, - sidecar_data.validity.clone(), - ); - - Self::from_list_array(name, data_type, data_array, sidecar_data) - } - - pub fn from_list_array( - name: &str, - data_type: DataType, - data_array: ListArray, - sidecar_data: ImageArraySidecarData, - ) -> DaftResult { - let values: Vec = vec![ - data_array.into_series().rename("data"), - UInt16Array::from(( - "channel", - Box::new( - arrow2::array::UInt16Array::from_vec(sidecar_data.channels) - .with_validity(sidecar_data.validity.clone()), - ), - )) - .into_series(), - UInt32Array::from(( - "height", - Box::new( - arrow2::array::UInt32Array::from_vec(sidecar_data.heights) - .with_validity(sidecar_data.validity.clone()), - ), - )) - .into_series(), - UInt32Array::from(( - "width", - Box::new( - arrow2::array::UInt32Array::from_vec(sidecar_data.widths) - .with_validity(sidecar_data.validity.clone()), - ), - )) - .into_series(), - UInt8Array::from(( - "mode", - Box::new( - arrow2::array::UInt8Array::from_vec(sidecar_data.modes) - .with_validity(sidecar_data.validity.clone()), - ), - )) - .into_series(), - ]; - let physical_type = data_type.to_physical(); - let struct_array = StructArray::new( - Field::new(name, physical_type), - values, - sidecar_data.validity, - ); - Ok(ImageArray::new(Field::new(name, data_type), struct_array)) - } - - pub fn encode(&self, image_format: ImageFormat) -> DaftResult { - encode_images(self, image_format) - } - - pub fn resize(&self, w: u32, h: u32) -> DaftResult { - let result = resize_images(self, w, h); - Self::from_daft_image_buffers(self.name(), result.as_slice(), self.image_mode()) - } - - pub fn crop(&self, bboxes: &FixedSizeListArray) -> DaftResult { - let mut bboxes_iterator: Box>> = if bboxes.len() == 1 { - Box::new(std::iter::repeat(bboxes.get(0).map(|bbox| { - BBox::from_u32_arrow_array(bbox.u32().unwrap().data()) - }))) - } else { - Box::new((0..bboxes.len()).map(|i| { - bboxes - .get(i) - .map(|bbox| BBox::from_u32_arrow_array(bbox.u32().unwrap().data())) - })) - }; - let result = crop_images(self, &mut bboxes_iterator); - Self::from_daft_image_buffers(self.name(), result.as_slice(), self.image_mode()) - } - - pub fn resize_to_fixed_shape_image_array( - &self, - w: u32, - h: u32, - mode: &ImageMode, - ) -> DaftResult { - let result = resize_images(self, w, h); - FixedShapeImageArray::from_daft_image_buffers(self.name(), result.as_slice(), mode, h, w) - } - - pub fn from_daft_image_buffers( - name: &str, - inputs: &[Option>], - image_mode: &Option, - ) -> DaftResult { - let is_all_u8 = inputs.iter().filter_map(|b| b.as_ref()).all(|b| { - matches!( - b, - DaftImageBuffer::L(..) - | DaftImageBuffer::LA(..) - | DaftImageBuffer::RGB(..) - | DaftImageBuffer::RGBA(..) - ) - }); - assert!(is_all_u8); - - let mut data_ref = Vec::with_capacity(inputs.len()); - let mut heights = Vec::with_capacity(inputs.len()); - let mut channels = Vec::with_capacity(inputs.len()); - let mut modes = Vec::with_capacity(inputs.len()); - let mut widths = Vec::with_capacity(inputs.len()); - let mut offsets = Vec::with_capacity(inputs.len() + 1); - offsets.push(0i64); - let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(inputs.len()); - - for ib in inputs { - validity.push(ib.is_some()); - let (height, width, mode, buffer) = match ib { - Some(ib) => (ib.height(), ib.width(), ib.mode(), ib.as_u8_slice()), - None => (0u32, 0u32, ImageMode::L, &[] as &[u8]), - }; - heights.push(height); - widths.push(width); - modes.push(mode as u8); - channels.push(mode.num_channels()); - data_ref.push(buffer); - offsets.push(offsets.last().unwrap() + buffer.len() as i64); - } - - let data = data_ref.concat(); - let validity: Option = match validity.unset_bits() { - 0 => None, - _ => Some(validity.into()), - }; - Self::from_vecs( - name, - DataType::Image(*image_mode), - data, - offsets, - ImageArraySidecarData { - channels, - heights, - widths, - modes, - validity, - }, - ) - } - - pub fn to_mode(&self, mode: ImageMode) -> DaftResult { - let buffers: Vec> = self - .into_iter() - .map(|img| img.map(|img| img.into_mode(mode))) - .collect(); - Self::from_daft_image_buffers(self.name(), &buffers, &Some(mode)) - } -} - -impl AsImageObj for ImageArray { - fn len(&self) -> usize { - ImageArray::len(self) - } - - fn name(&self) -> &str { - ImageArray::name(self) - } - - fn as_image_obj<'a>(&'a self, idx: usize) -> Option> { - assert!(idx < self.len()); - if !self.physical.is_valid(idx) { - return None; - } - - let da = self.data_array(); - let ca = self.channel_array(); - let ha = self.height_array(); - let wa = self.width_array(); - let ma = self.mode_array(); - - let offsets = da.offsets(); - - let start = *offsets.get(idx).unwrap() as usize; - let end = *offsets.get(idx + 1).unwrap() as usize; - - let values = da - .flat_child - .u8() - .unwrap() - .data() - .as_any() - .downcast_ref::() - .unwrap(); - let slice_data = Cow::Borrowed(&values.values().as_slice()[start..end] as &'a [u8]); - - let c = ca.value(idx); - let h = ha.value(idx); - let w = wa.value(idx); - let m: ImageMode = ImageMode::from_u8(ma.value(idx)).unwrap(); - assert_eq!(m.num_channels(), c); - let result = match m { - ImageMode::L => { - DaftImageBuffer::<'a>::L(ImageBuffer::from_raw(w, h, slice_data).unwrap()) - } - ImageMode::LA => { - DaftImageBuffer::<'a>::LA(ImageBuffer::from_raw(w, h, slice_data).unwrap()) - } - ImageMode::RGB => { - DaftImageBuffer::<'a>::RGB(ImageBuffer::from_raw(w, h, slice_data).unwrap()) - } - ImageMode::RGBA => { - DaftImageBuffer::<'a>::RGBA(ImageBuffer::from_raw(w, h, slice_data).unwrap()) - } - _ => unimplemented!("{m} is currently not implemented!"), - }; - - assert_eq!(result.height(), h); - assert_eq!(result.width(), w); - Some(result) - } -} - -impl FixedShapeImageArray { - fn mode(&self) -> ImageMode { - match &self.field.dtype { - DataType::FixedShapeImage(mode, _, _) => *mode, - _ => panic!("FixedShapeImageArray does not have the correct FixedShapeImage dtype"), - } - } - - pub fn from_daft_image_buffers( - name: &str, - inputs: &[Option>], - image_mode: &ImageMode, - height: u32, - width: u32, - ) -> DaftResult { - let is_all_u8 = inputs.iter().filter_map(|b| b.as_ref()).all(|b| { - matches!( - b, - DaftImageBuffer::L(..) - | DaftImageBuffer::LA(..) - | DaftImageBuffer::RGB(..) - | DaftImageBuffer::RGBA(..) - ) - }); - assert!(is_all_u8); - - let num_channels = image_mode.num_channels(); - let mut data_ref = Vec::with_capacity(inputs.len()); - let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(inputs.len()); - let list_size = (height * width * num_channels as u32) as usize; - let null_list = vec![0u8; list_size]; - for ib in inputs.iter() { - validity.push(ib.is_some()); - let buffer = match ib { - Some(ib) => ib.as_u8_slice(), - None => null_list.as_slice(), - }; - data_ref.push(buffer) - } - let data = data_ref.concat(); - let validity: Option = match validity.unset_bits() { - 0 => None, - _ => Some(validity.into()), - }; - - let arrow_dtype = arrow2::datatypes::DataType::FixedSizeList( - Box::new(arrow2::datatypes::Field::new( - "data", - arrow2::datatypes::DataType::UInt8, - true, - )), - list_size, - ); - let arrow_array = Box::new(arrow2::array::FixedSizeListArray::new( - arrow_dtype.clone(), - Box::new(arrow2::array::PrimitiveArray::from_vec(data)), - validity, - )); - let physical_array = FixedSizeListArray::from_arrow( - Arc::new(Field::new(name, (&arrow_dtype).into())), - arrow_array, - )?; - let logical_dtype = DataType::FixedShapeImage(*image_mode, height, width); - Ok(Self::new(Field::new(name, logical_dtype), physical_array)) - } - - pub fn encode(&self, image_format: ImageFormat) -> DaftResult { - encode_images(self, image_format) - } - - pub fn resize(&self, w: u32, h: u32) -> DaftResult { - let result = resize_images(self, w, h); - match &self.data_type() { - DataType::FixedShapeImage(mode, _, _) => Self::from_daft_image_buffers(self.name(), result.as_slice(), mode, h, w), - dt => panic!("FixedShapeImageArray should always have DataType::FixedShapeImage() as it's dtype, but got {}", dt), - } - } - - pub fn crop(&self, bboxes: &FixedSizeListArray) -> DaftResult { - let mut bboxes_iterator: Box>> = if bboxes.len() == 1 { - Box::new(std::iter::repeat(bboxes.get(0).map(|bbox| { - BBox::from_u32_arrow_array(bbox.u32().unwrap().data()) - }))) - } else { - Box::new((0..bboxes.len()).map(|i| { - bboxes - .get(i) - .map(|bbox| BBox::from_u32_arrow_array(bbox.u32().unwrap().data())) - })) - }; - let result = crop_images(self, &mut bboxes_iterator); - ImageArray::from_daft_image_buffers(self.name(), result.as_slice(), &Some(self.mode())) - } - - pub fn to_mode(&self, mode: ImageMode) -> DaftResult { - let buffers: Vec> = self - .into_iter() - .map(|img| img.map(|img| img.into_mode(mode))) - .collect(); - - let (height, width) = match self.data_type() { - DataType::FixedShapeImage(_, h, w) => (h, w), - _ => unreachable!("self should always be a FixedShapeImage"), - }; - Self::from_daft_image_buffers(self.name(), &buffers, &mode, *height, *width) - } -} - -impl AsImageObj for FixedShapeImageArray { - fn len(&self) -> usize { - FixedShapeImageArray::len(self) - } - - fn name(&self) -> &str { - FixedShapeImageArray::name(self) - } - - fn as_image_obj<'a>(&'a self, idx: usize) -> Option> { - assert!(idx < self.len()); - if !self.physical.is_valid(idx) { - return None; - } - - match self.data_type() { - DataType::FixedShapeImage(mode, height, width) => { - let arrow_array = self.physical.flat_child.downcast::().unwrap().as_arrow(); - let num_channels = mode.num_channels(); - let size = height * width * num_channels as u32; - let start = idx * size as usize; - let end = (idx + 1) * size as usize; - let slice_data = Cow::Borrowed(&arrow_array.values().as_slice()[start..end] as &'a [u8]); - let result = match mode { - ImageMode::L => { - DaftImageBuffer::<'a>::L(ImageBuffer::from_raw(*width, *height, slice_data).unwrap()) - } - ImageMode::LA => { - DaftImageBuffer::<'a>::LA(ImageBuffer::from_raw(*width, *height, slice_data).unwrap()) - } - ImageMode::RGB => { - DaftImageBuffer::<'a>::RGB(ImageBuffer::from_raw(*width, *height, slice_data).unwrap()) - } - ImageMode::RGBA => { - DaftImageBuffer::<'a>::RGBA(ImageBuffer::from_raw(*width, *height, slice_data).unwrap()) - } - _ => unimplemented!("{mode} is currently not implemented!"), - }; - - assert_eq!(result.height(), *height); - assert_eq!(result.width(), *width); - Some(result) - } - dt => panic!("FixedShapeImageArray should always have DataType::FixedShapeImage() as it's dtype, but got {}", dt), - } - } -} - -impl<'a, T> IntoIterator for &'a LogicalArray -where - T: DaftImageryType, - LogicalArray: AsImageObj, -{ - type Item = Option>; - type IntoIter = ImageBufferIter<'a, LogicalArray>; - - fn into_iter(self) -> Self::IntoIter { - ImageBufferIter::new(self) - } -} - -impl BinaryArray { - pub fn image_decode( - &self, - raise_error_on_failure: bool, - mode: Option, - ) -> DaftResult { - let arrow_array = self - .data() - .as_any() - .downcast_ref::>() - .unwrap(); - let mut img_bufs = Vec::>::with_capacity(arrow_array.len()); - let mut cached_dtype: Option = None; - // Load images from binary buffers. - // Confirm that all images have the same value dtype. - for (index, row) in arrow_array.iter().enumerate() { - let mut img_buf = match row.map(DaftImageBuffer::decode).transpose() { - Ok(val) => val, - Err(err) => { - if raise_error_on_failure { - return Err(err); - } else { - log::warn!( - "Error occurred during image decoding at index: {index} {} (falling back to Null)", - err - ); - None - } - } - }; - if let Some(mode) = mode { - img_buf = img_buf.map(|buf| buf.into_mode(mode)); - } - let dtype = img_buf.as_ref().map(|im| im.mode().get_dtype()); - match (dtype.as_ref(), cached_dtype.as_ref()) { - (Some(t1), Some(t2)) => { - if t1 != t2 { - return Err(DaftError::ValueError(format!("All images in a column must have the same dtype, but got: {:?} and {:?}", t1, t2))); - } - } - (Some(t1), None) => { - cached_dtype = Some(t1.clone()); - } - (None, _) => {} - } - img_bufs.push(img_buf); - } - // Fall back to UInt8 dtype if series is all nulls. - let cached_dtype = cached_dtype.unwrap_or(DataType::UInt8); - match cached_dtype { - DataType::UInt8 => Ok(ImageArray::from_daft_image_buffers(self.name(), img_bufs.as_slice(), &mode)?), - _ => unimplemented!("Decoding images of dtype {cached_dtype:?} is not supported, only uint8 images are supported."), - } - } -} - -fn encode_images<'a, Arr>(images: &'a Arr, image_format: ImageFormat) -> DaftResult -where - Arr: AsImageObj, - &'a Arr: IntoIterator>, IntoIter = ImageBufferIter<'a, Arr>>, -{ - let arrow_array = match image_format { - ImageFormat::TIFF => { - // NOTE: A single writer/buffer can't be used for TIFF files because the encoder will overwrite the - // IFD offset for the first image instead of writing it for all subsequent images, producing corrupted - // TIFF files. We work around this by writing out a new buffer for each image. - // TODO(Clark): Fix this in the tiff crate. - let values = images - .into_iter() - .map(|img| { - img.map(|img| { - let buf = Vec::new(); - let mut writer: CountingWriter> = - std::io::BufWriter::new(std::io::Cursor::new(buf)).into(); - img.encode(image_format, &mut writer)?; - // NOTE: BufWriter::into_inner() will flush the buffer. - Ok(writer - .into_inner() - .into_inner() - .map_err(|e| { - DaftError::ValueError(format!( - "Encoding image into file format {} failed: {}", - image_format, e - )) - })? - .into_inner()) - }) - .transpose() - }) - .collect::>>()?; - arrow2::array::BinaryArray::::from_iter(values) - } - _ => { - let mut offsets = Vec::with_capacity(images.len() + 1); - offsets.push(0i64); - let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(images.len()); - let buf = Vec::new(); - let mut writer: CountingWriter> = - std::io::BufWriter::new(std::io::Cursor::new(buf)).into(); - images - .into_iter() - .map(|img| { - match img { - Some(img) => { - img.encode(image_format, &mut writer)?; - offsets.push(writer.count() as i64); - validity.push(true); - } - None => { - offsets.push(*offsets.last().unwrap()); - validity.push(false); - } - } - Ok(()) - }) - .collect::>>()?; - // NOTE: BufWriter::into_inner() will flush the buffer. - let values = writer - .into_inner() - .into_inner() - .map_err(|e| { - DaftError::ValueError(format!( - "Encoding image into file format {} failed: {}", - image_format, e - )) - })? - .into_inner(); - let encoded_data: arrow2::buffer::Buffer = values.into(); - let offsets_buffer = arrow2::offset::OffsetsBuffer::try_from(offsets)?; - let validity: Option = match validity.unset_bits() { - 0 => None, - _ => Some(validity.into()), - }; - arrow2::array::BinaryArray::::new( - arrow2::datatypes::DataType::LargeBinary, - offsets_buffer, - encoded_data, - validity, - ) - } - }; - BinaryArray::new( - Field::new(images.name(), arrow_array.data_type().into()).into(), - arrow_array.boxed(), - ) -} - -fn resize_images<'a, Arr>(images: &'a Arr, w: u32, h: u32) -> Vec> -where - Arr: AsImageObj, - &'a Arr: IntoIterator>, IntoIter = ImageBufferIter<'a, Arr>>, -{ - images - .into_iter() - .map(|img| img.map(|img| img.resize(w, h))) - .collect::>() -} - -fn crop_images<'a, Arr>( - images: &'a Arr, - bboxes: &mut dyn Iterator>, -) -> Vec>> -where - Arr: AsImageObj, - &'a Arr: IntoIterator>, IntoIter = ImageBufferIter<'a, Arr>>, -{ - images - .into_iter() - .zip(bboxes) - .map(|(img, bbox)| match (img, bbox) { - (None, _) | (_, None) => None, - (Some(img), Some(bbox)) => Some(img.crop(&bbox)), - }) - .collect::>() -} diff --git a/src/daft-core/src/array/ops/mod.rs b/src/daft-core/src/array/ops/mod.rs index fd6dd056b1..faeaab5fae 100644 --- a/src/daft-core/src/array/ops/mod.rs +++ b/src/daft-core/src/array/ops/mod.rs @@ -29,7 +29,6 @@ mod hash; mod hll_merge; mod hll_sketch; mod if_else; -pub(crate) mod image; mod is_in; mod json; mod len; diff --git a/src/daft-core/src/array/ops/repr.rs b/src/daft-core/src/array/ops/repr.rs index aeedb30610..03df8366b8 100644 --- a/src/daft-core/src/array/ops/repr.rs +++ b/src/daft-core/src/array/ops/repr.rs @@ -1,4 +1,3 @@ -use base64::Engine; use common_display::table_display::StrValue; use crate::{ @@ -10,7 +9,7 @@ use crate::{ FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray, }, BinaryArray, BooleanArray, DaftNumericType, ExtensionArray, FixedSizeBinaryArray, - ImageFormat, NullArray, UInt64Array, Utf8Array, + NullArray, UInt64Array, Utf8Array, }, series::Series, utils::display::{display_date32, display_decimal128, display_time64, display_timestamp}, @@ -18,8 +17,6 @@ use crate::{ }; use common_error::DaftResult; -use super::image::AsImageObj; - // Default implementation of str_value: format the value with the given format string. macro_rules! impl_array_str_value { ($ArrayT:ty, $fmt:expr) => { @@ -419,52 +416,6 @@ where } } -impl ImageArray { - pub fn html_value(&self, idx: usize) -> String { - let maybe_image = self.as_image_obj(idx); - let str_val = self.str_value(idx).unwrap(); - - match maybe_image { - None => "None".to_string(), - Some(image) => { - let thumb = image.fit_to(128, 128); - let mut bytes: Vec = vec![]; - let mut writer = std::io::BufWriter::new(std::io::Cursor::new(&mut bytes)); - thumb.encode(ImageFormat::JPEG, &mut writer).unwrap(); - drop(writer); - format!( - "\"{}\"", - base64::engine::general_purpose::STANDARD.encode(&mut bytes), - str_val, - ) - } - } - } -} - -impl FixedShapeImageArray { - pub fn html_value(&self, idx: usize) -> String { - let maybe_image = self.as_image_obj(idx); - let str_val = self.str_value(idx).unwrap(); - - match maybe_image { - None => "None".to_string(), - Some(image) => { - let thumb = image.fit_to(128, 128); - let mut bytes: Vec = vec![]; - let mut writer = std::io::BufWriter::new(std::io::Cursor::new(&mut bytes)); - thumb.encode(ImageFormat::JPEG, &mut writer).unwrap(); - drop(writer); - format!( - "\"{}\"", - base64::engine::general_purpose::STANDARD.encode(&mut bytes), - str_val, - ) - } - } - } -} - impl FixedShapeTensorArray { pub fn html_value(&self, idx: usize) -> String { let str_value = self.str_value(idx).unwrap(); diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index 7cea1fc248..ccc072984a 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -14,7 +14,7 @@ use crate::{ DataArray, }, count_mode::CountMode, - datatypes::{DataType, Field, ImageFormat, ImageMode, PythonType}, + datatypes::{DataType, Field, ImageMode, PythonType}, series::{self, IntoSeries, Series}, utils::arrow::{cast_array_for_daft_if_needed, cast_array_from_daft_if_needed}, }; @@ -674,40 +674,6 @@ impl PySeries { Ok(self.series.map_get(&key.series)?.into()) } - pub fn image_decode( - &self, - raise_error_on_failure: bool, - mode: Option, - ) -> PyResult { - Ok(self - .series - .image_decode(raise_error_on_failure, mode)? - .into()) - } - - pub fn image_encode(&self, image_format: ImageFormat) -> PyResult { - Ok(self.series.image_encode(image_format)?.into()) - } - - pub fn image_resize(&self, w: i64, h: i64) -> PyResult { - if w < 0 { - return Err(PyValueError::new_err(format!( - "width can not be negative: {w}" - ))); - } - if h < 0 { - return Err(PyValueError::new_err(format!( - "height can not be negative: {h}" - ))); - } - - Ok(self.series.image_resize(w as u32, h as u32)?.into()) - } - - pub fn image_to_mode(&self, mode: &ImageMode) -> PyResult { - Ok(self.series.image_to_mode(*mode)?.into()) - } - pub fn if_else(&self, other: &Self, predicate: &Self) -> PyResult { Ok(self .series diff --git a/src/daft-core/src/series/array_impl/data_array.rs b/src/daft-core/src/series/array_impl/data_array.rs index e423e17289..3fdaf527fc 100644 --- a/src/daft-core/src/series/array_impl/data_array.rs +++ b/src/daft-core/src/series/array_impl/data_array.rs @@ -124,9 +124,7 @@ macro_rules! impl_series_like_for_data_array { fn str_value(&self, idx: usize) -> DaftResult { self.0.str_value(idx) } - fn html_value(&self, idx: usize) -> String { - self.0.html_value(idx) - } + fn take(&self, idx: &Series) -> DaftResult { with_match_integer_daft_types!(idx.data_type(), |$S| { Ok(self diff --git a/src/daft-core/src/series/array_impl/logical_array.rs b/src/daft-core/src/series/array_impl/logical_array.rs index e1dbaa93c5..9b5ca9e5f6 100644 --- a/src/daft-core/src/series/array_impl/logical_array.rs +++ b/src/daft-core/src/series/array_impl/logical_array.rs @@ -125,10 +125,6 @@ macro_rules! impl_series_like_for_logical_array { self.0.str_value(idx) } - fn html_value(&self, idx: usize) -> String { - self.0.html_value(idx) - } - fn take(&self, idx: &Series) -> DaftResult { with_match_integer_daft_types!(idx.data_type(), |$S| { Ok(self @@ -227,8 +223,8 @@ impl_series_like_for_logical_array!(TimeArray); impl_series_like_for_logical_array!(DurationArray); impl_series_like_for_logical_array!(TimestampArray); impl_series_like_for_logical_array!(ImageArray); +impl_series_like_for_logical_array!(FixedShapeImageArray); impl_series_like_for_logical_array!(TensorArray); impl_series_like_for_logical_array!(EmbeddingArray); -impl_series_like_for_logical_array!(FixedShapeImageArray); impl_series_like_for_logical_array!(FixedShapeTensorArray); impl_series_like_for_logical_array!(MapArray); diff --git a/src/daft-core/src/series/array_impl/nested_array.rs b/src/daft-core/src/series/array_impl/nested_array.rs index b9da4c8dc9..9ec9939d11 100644 --- a/src/daft-core/src/series/array_impl/nested_array.rs +++ b/src/daft-core/src/series/array_impl/nested_array.rs @@ -148,10 +148,6 @@ macro_rules! impl_series_like_for_nested_arrays { self.0.str_value(idx) } - fn html_value(&self, idx: usize) -> String { - self.0.html_value(idx) - } - fn add(&self, rhs: &Series) -> DaftResult { SeriesBinaryOps::add(self, rhs) } diff --git a/src/daft-core/src/series/ops/downcast.rs b/src/daft-core/src/series/ops/downcast.rs index 8c509113c5..8c85dbef39 100644 --- a/src/daft-core/src/series/ops/downcast.rs +++ b/src/daft-core/src/series/ops/downcast.rs @@ -6,6 +6,7 @@ use crate::datatypes::*; use crate::series::array_impl::ArrayWrapper; use crate::series::Series; use common_error::DaftResult; +use logical::{EmbeddingArray, FixedShapeTensorArray, TensorArray}; use self::logical::{DurationArray, ImageArray, MapArray}; @@ -139,4 +140,16 @@ impl Series { pub fn python(&self) -> DaftResult<&PythonArray> { self.downcast() } + + pub fn embedding(&self) -> DaftResult<&EmbeddingArray> { + self.downcast() + } + + pub fn tensor(&self) -> DaftResult<&TensorArray> { + self.downcast() + } + + pub fn fixed_shape_tensor(&self) -> DaftResult<&FixedShapeTensorArray> { + self.downcast() + } } diff --git a/src/daft-core/src/series/ops/image.rs b/src/daft-core/src/series/ops/image.rs deleted file mode 100644 index 11dc55ffe8..0000000000 --- a/src/daft-core/src/series/ops/image.rs +++ /dev/null @@ -1,101 +0,0 @@ -use crate::datatypes::logical::{FixedShapeImageArray, ImageArray}; -use crate::datatypes::{DataType, ImageFormat, ImageMode}; - -use crate::series::{IntoSeries, Series}; -use common_error::{DaftError, DaftResult}; - -impl Series { - pub fn image_decode( - &self, - raise_error_on_failure: bool, - mode: Option, - ) -> DaftResult { - match self.data_type() { - DataType::Binary => Ok(self.binary()?.image_decode(raise_error_on_failure, mode)?.into_series()), - dtype => Err(DaftError::ValueError(format!( - "Decoding in-memory data into images is only supported for binary arrays, but got {}", dtype - ))), - } - } - - pub fn image_encode(&self, image_format: ImageFormat) -> DaftResult { - match self.data_type() { - DataType::Image(..) => Ok(self - .downcast::()? - .encode(image_format)? - .into_series()), - DataType::FixedShapeImage(..) => Ok(self - .downcast::()? - .encode(image_format)? - .into_series()), - dtype => Err(DaftError::ValueError(format!( - "Encoding images into bytes is only supported for image arrays, but got {}", - dtype - ))), - } - } - - pub fn image_resize(&self, w: u32, h: u32) -> DaftResult { - match self.data_type() { - DataType::Image(mode) => { - let array = self.downcast::()?; - match mode { - // If the image mode is specified at the type-level (and is therefore guaranteed to be consistent - // across all images across all partitions), store the resized image in a fixed shape image array, - // since we'll have homogeneous modes, heights, and widths after resizing. - Some(mode) => Ok(array - .resize_to_fixed_shape_image_array(w, h, mode)? - .into_series()), - None => Ok(array.resize(w, h)?.into_series()), - } - } - DataType::FixedShapeImage(..) => Ok(self - .downcast::()? - .resize(w, h)? - .into_series()), - _ => Err(DaftError::ValueError(format!( - "datatype: {} does not support Image Resize. Occurred while resizing Series: {}", - self.data_type(), - self.name() - ))), - } - } - - pub fn image_crop(&self, bbox: &Series) -> DaftResult { - let bbox_type = DataType::FixedSizeList(Box::new(DataType::UInt32), 4); - let bbox = bbox.cast(&bbox_type)?; - let bbox = bbox.fixed_size_list()?; - - match &self.data_type() { - DataType::Image(_) => self - .downcast::()? - .crop(bbox) - .map(|arr| arr.into_series()), - DataType::FixedShapeImage(..) => self - .fixed_size_image()? - .crop(bbox) - .map(|arr| arr.into_series()), - dt => Err(DaftError::ValueError(format!( - "Expected input to crop to be an Image type, but received: {}", - dt - ))), - } - } - - pub fn image_to_mode(&self, mode: ImageMode) -> DaftResult { - match &self.data_type() { - DataType::Image(_) => self - .downcast::()? - .to_mode(mode) - .map(|arr| arr.into_series()), - DataType::FixedShapeImage(..) => self - .fixed_size_image()? - .to_mode(mode) - .map(|arr| arr.into_series()), - dt => Err(DaftError::ValueError(format!( - "Expected input to crop to be an Image type, but received: {}", - dt - ))), - } - } -} diff --git a/src/daft-core/src/series/ops/mod.rs b/src/daft-core/src/series/ops/mod.rs index 6f1b6e759e..66f7b9b91e 100644 --- a/src/daft-core/src/series/ops/mod.rs +++ b/src/daft-core/src/series/ops/mod.rs @@ -21,7 +21,6 @@ pub mod floor; pub mod groups; pub mod hash; pub mod if_else; -pub mod image; pub mod is_in; pub mod json; pub mod len; diff --git a/src/daft-core/src/series/ops/take.rs b/src/daft-core/src/series/ops/take.rs index 4f7dfb2a9a..c400a0e5a3 100644 --- a/src/daft-core/src/series/ops/take.rs +++ b/src/daft-core/src/series/ops/take.rs @@ -24,10 +24,6 @@ impl Series { self.inner.take(idx) } - pub fn html_value(&self, idx: usize) -> String { - self.inner.html_value(idx) - } - pub fn to_str_values(&self) -> DaftResult { let iter = IndexRange::new(0i64, self.len() as i64).map(|i| Some(self.str_value(i as usize))); diff --git a/src/daft-core/src/series/series_like.rs b/src/daft-core/src/series/series_like.rs index 980e6784fb..10d75b7540 100644 --- a/src/daft-core/src/series/series_like.rs +++ b/src/daft-core/src/series/series_like.rs @@ -34,7 +34,6 @@ pub trait SeriesLike: Send + Sync + Any + std::fmt::Debug { fn slice(&self, start: usize, end: usize) -> DaftResult; fn take(&self, idx: &Series) -> DaftResult; fn str_value(&self, idx: usize) -> DaftResult; - fn html_value(&self, idx: usize) -> String; fn add(&self, rhs: &Series) -> DaftResult; fn sub(&self, rhs: &Series) -> DaftResult; fn mul(&self, rhs: &Series) -> DaftResult; diff --git a/src/daft-functions/Cargo.toml b/src/daft-functions/Cargo.toml index 92b2d1bd1a..bd9281b4ea 100644 --- a/src/daft-functions/Cargo.toml +++ b/src/daft-functions/Cargo.toml @@ -5,6 +5,7 @@ common-error = {path = "../common/error", default-features = false} common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-image = {path = "../daft-image", default-features = false} daft-io = {path = "../daft-io", default-features = false} futures = {workspace = true} pyo3 = {workspace = true, optional = true} @@ -22,6 +23,7 @@ python = [ "common-error/python", "daft-core/python", "daft-io/python", + "daft-image/python", "common-io-config/python" ] diff --git a/src/daft-functions/src/image/crop.rs b/src/daft-functions/src/image/crop.rs index e687174339..c350914340 100644 --- a/src/daft-functions/src/image/crop.rs +++ b/src/daft-functions/src/image/crop.rs @@ -70,7 +70,7 @@ impl ScalarUDF for ImageCrop { fn evaluate(&self, inputs: &[Series]) -> DaftResult { match inputs { - [input, bbox] => input.image_crop(bbox), + [input, bbox] => daft_image::series::crop(input, bbox), _ => Err(DaftError::ValueError(format!( "Expected 2 input args, got {}", inputs.len() diff --git a/src/daft-functions/src/image/decode.rs b/src/daft-functions/src/image/decode.rs index 121c50555e..72173b16d6 100644 --- a/src/daft-functions/src/image/decode.rs +++ b/src/daft-functions/src/image/decode.rs @@ -64,7 +64,7 @@ impl ScalarUDF for ImageDecode { fn evaluate(&self, inputs: &[Series]) -> DaftResult { let raise_error_on_failure = self.raise_on_error; match inputs { - [input] => input.image_decode(raise_error_on_failure, self.mode), + [input] => daft_image::series::decode(input, raise_error_on_failure, self.mode), _ => Err(DaftError::ValueError(format!( "Expected 1 input arg, got {}", inputs.len() diff --git a/src/daft-functions/src/image/encode.rs b/src/daft-functions/src/image/encode.rs index 9b42c14d87..5c465ccbb4 100644 --- a/src/daft-functions/src/image/encode.rs +++ b/src/daft-functions/src/image/encode.rs @@ -49,7 +49,7 @@ impl ScalarUDF for ImageEncode { fn evaluate(&self, inputs: &[Series]) -> DaftResult { match inputs { - [input] => input.image_encode(self.image_format), + [input] => daft_image::series::encode(input, self.image_format), _ => Err(DaftError::ValueError(format!( "Expected 1 input arg, got {}", inputs.len() diff --git a/src/daft-functions/src/image/resize.rs b/src/daft-functions/src/image/resize.rs index 24c0bf0717..ce493f91ac 100644 --- a/src/daft-functions/src/image/resize.rs +++ b/src/daft-functions/src/image/resize.rs @@ -50,7 +50,7 @@ impl ScalarUDF for ImageResize { fn evaluate(&self, inputs: &[Series]) -> DaftResult { match inputs { - [input] => input.image_resize(self.width, self.height), + [input] => daft_image::series::resize(input, self.width, self.height), _ => Err(DaftError::ValueError(format!( "Expected 1 input arg, got {}", inputs.len() diff --git a/src/daft-functions/src/image/to_mode.rs b/src/daft-functions/src/image/to_mode.rs index 4648c1b04a..349b56304a 100644 --- a/src/daft-functions/src/image/to_mode.rs +++ b/src/daft-functions/src/image/to_mode.rs @@ -49,7 +49,7 @@ impl ScalarUDF for ImageToMode { fn evaluate(&self, inputs: &[Series]) -> DaftResult { match inputs { - [input] => input.image_to_mode(self.mode), + [input] => daft_image::series::to_mode(input, self.mode), _ => Err(DaftError::ValueError(format!( "Expected 1 input arg, got {}", inputs.len() diff --git a/src/daft-image/Cargo.toml b/src/daft-image/Cargo.toml new file mode 100644 index 0000000000..6dab7b3b8f --- /dev/null +++ b/src/daft-image/Cargo.toml @@ -0,0 +1,24 @@ +[dependencies] +arrow2 = {workspace = true} +common-error = {path = "../common/error", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +log = {workspace = true} +num-traits = "0.2.19" +pyo3 = {workspace = true, optional = true} +base64.workspace = true + +[dependencies.image] +default-features = false +features = ["gif", "jpeg", "ico", "png", "tiff", "webp", "bmp", "hdr"] +version = "0.24.7" + +[features] +python = [ + "dep:pyo3", + "common-error/python" +] + +[package] +name = "daft-image" +edition.workspace = true +version.workspace = true diff --git a/src/daft-image/src/counting_writer.rs b/src/daft-image/src/counting_writer.rs new file mode 100644 index 0000000000..d01e65b3d1 --- /dev/null +++ b/src/daft-image/src/counting_writer.rs @@ -0,0 +1,45 @@ +use std::io::{Seek, SeekFrom, Write}; + +type IOResult = std::result::Result; + +/// A wrapper of a writer that tracks the number of bytes successfully written. +pub struct CountingWriter { + inner: W, + count: u64, +} + +impl CountingWriter { + /// The number of bytes successful written so far. + pub fn count(&self) -> u64 { + self.count + } + + /// Extracts the inner writer, discarding this wrapper. + pub fn into_inner(self) -> W { + self.inner + } +} + +impl From for CountingWriter { + fn from(inner: W) -> Self { + Self { inner, count: 0 } + } +} + +impl Write for CountingWriter { + fn write(&mut self, buf: &[u8]) -> IOResult { + let written = self.inner.write(buf)?; + self.count += written as u64; + Ok(written) + } + + fn flush(&mut self) -> IOResult { + self.inner.flush() + } +} + +impl Seek for CountingWriter { + fn seek(&mut self, pos: SeekFrom) -> IOResult { + self.inner.seek(pos) + } +} diff --git a/src/daft-image/src/image_buffer.rs b/src/daft-image/src/image_buffer.rs new file mode 100644 index 0000000000..0855fd4ec8 --- /dev/null +++ b/src/daft-image/src/image_buffer.rs @@ -0,0 +1,317 @@ +use common_error::{DaftError, DaftResult}; +use daft_core::array::image_array::BBox; +use daft_core::datatypes::prelude::*; +use image::{ColorType, DynamicImage, ImageBuffer}; +use image::{Luma, LumaA, Rgb, Rgba}; +use std::borrow::Cow; +use std::io::{Seek, Write}; +use std::ops::Deref; + +#[allow(clippy::upper_case_acronyms, dead_code)] +#[derive(Debug)] +pub enum DaftImageBuffer<'a> { + L(ImageBuffer, Cow<'a, [u8]>>), + LA(ImageBuffer, Cow<'a, [u8]>>), + RGB(ImageBuffer, Cow<'a, [u8]>>), + RGBA(ImageBuffer, Cow<'a, [u8]>>), + L16(ImageBuffer, Cow<'a, [u16]>>), + LA16(ImageBuffer, Cow<'a, [u16]>>), + RGB16(ImageBuffer, Cow<'a, [u16]>>), + RGBA16(ImageBuffer, Cow<'a, [u16]>>), + RGB32F(ImageBuffer, Cow<'a, [f32]>>), + RGBA32F(ImageBuffer, Cow<'a, [f32]>>), +} + +macro_rules! with_method_on_image_buffer { + ( + $key_type:expr, $method: ident +) => {{ + use DaftImageBuffer::*; + + match $key_type { + L(img) => img.$method(), + LA(img) => img.$method(), + RGB(img) => img.$method(), + RGBA(img) => img.$method(), + L16(img) => img.$method(), + LA16(img) => img.$method(), + RGB16(img) => img.$method(), + RGBA16(img) => img.$method(), + RGB32F(img) => img.$method(), + RGBA32F(img) => img.$method(), + } + }}; +} + +impl<'a> DaftImageBuffer<'a> { + pub fn from_raw( + mode: &ImageMode, + width: u32, + height: u32, + data: Cow<'a, [u8]>, + ) -> DaftImageBuffer<'a> { + use DaftImageBuffer::*; + match mode { + ImageMode::L => L(ImageBuffer::from_raw(width, height, data).unwrap()), + ImageMode::LA => LA(ImageBuffer::from_raw(width, height, data).unwrap()), + ImageMode::RGB => RGB(ImageBuffer::from_raw(width, height, data).unwrap()), + ImageMode::RGBA => RGBA(ImageBuffer::from_raw(width, height, data).unwrap()), + _ => unimplemented!("{mode} is currently not implemented!"), + } + } + pub fn height(&self) -> u32 { + with_method_on_image_buffer!(self, height) + } + + pub fn width(&self) -> u32 { + with_method_on_image_buffer!(self, width) + } + + pub fn as_u8_slice(&self) -> &[u8] { + use DaftImageBuffer::*; + match self { + L(img) => img.as_raw(), + LA(img) => img.as_raw(), + RGB(img) => img.as_raw(), + RGBA(img) => img.as_raw(), + _ => unimplemented!("unimplemented {self:?}"), + } + } + pub fn mode(&self) -> ImageMode { + use DaftImageBuffer::*; + + match self { + L(..) => ImageMode::L, + LA(..) => ImageMode::LA, + RGB(..) => ImageMode::RGB, + RGBA(..) => ImageMode::RGBA, + L16(..) => ImageMode::L16, + LA16(..) => ImageMode::LA16, + RGB16(..) => ImageMode::RGB16, + RGBA16(..) => ImageMode::RGBA16, + RGB32F(..) => ImageMode::RGB32F, + RGBA32F(..) => ImageMode::RGBA32F, + } + } + pub fn color(&self) -> ColorType { + let mode = DaftImageBuffer::mode(self); + use ImageMode::*; + match mode { + L => ColorType::L8, + LA => ColorType::La8, + RGB => ColorType::Rgb8, + RGBA => ColorType::Rgba8, + L16 => ColorType::L16, + LA16 => ColorType::La16, + RGB16 => ColorType::Rgb16, + RGBA16 => ColorType::Rgba16, + RGB32F => ColorType::Rgb32F, + RGBA32F => ColorType::Rgba32F, + } + } + + pub fn decode(bytes: &[u8]) -> DaftResult { + image::load_from_memory(bytes) + .map(|v| v.into()) + .map_err(|e| DaftError::ValueError(format!("Decoding image from bytes failed: {}", e))) + } + + pub fn encode(&self, image_format: ImageFormat, writer: &mut W) -> DaftResult<()> + where + W: Write + Seek, + { + image::write_buffer_with_format( + writer, + self.as_u8_slice(), + self.width(), + self.height(), + self.color(), + convert_img_fmt(image_format), + ) + .map_err(|e| { + DaftError::ValueError(format!( + "Encoding image into file format {} failed: {}", + image_format, e + )) + }) + } + + pub fn fit_to(&self, w: u32, h: u32) -> Self { + // Preserving aspect ratio, resize an image to fit within the specified dimensions. + let scale_factor = { + let width_scale = w as f64 / self.width() as f64; + let height_scale = h as f64 / self.height() as f64; + width_scale.min(height_scale) + }; + let new_w = self.width() as f64 * scale_factor; + let new_h = self.height() as f64 * scale_factor; + + self.resize(new_w.floor() as u32, new_h.floor() as u32) + } + + pub fn resize(&self, w: u32, h: u32) -> Self { + use DaftImageBuffer::*; + match self { + L(imgbuf) => { + let result = + image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); + DaftImageBuffer::L(image_buffer_vec_to_cow(result)) + } + LA(imgbuf) => { + let result = + image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); + DaftImageBuffer::LA(image_buffer_vec_to_cow(result)) + } + RGB(imgbuf) => { + let result = + image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); + DaftImageBuffer::RGB(image_buffer_vec_to_cow(result)) + } + RGBA(imgbuf) => { + let result = + image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle); + DaftImageBuffer::RGBA(image_buffer_vec_to_cow(result)) + } + _ => unimplemented!("Mode {self:?} not implemented"), + } + } + + pub fn crop(&self, bbox: &BBox) -> Self { + // HACK(jay): The `.to_image()` method on SubImage takes in `'static` references for some reason + // This hack will ensure that `&self` adheres to that overly prescriptive bound + let inner = + unsafe { std::mem::transmute::<&DaftImageBuffer<'a>, &DaftImageBuffer<'static>>(self) }; + match inner { + DaftImageBuffer::L(imgbuf) => { + let result = + image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); + DaftImageBuffer::L(image_buffer_vec_to_cow(result)) + } + DaftImageBuffer::LA(imgbuf) => { + let result = + image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); + DaftImageBuffer::LA(image_buffer_vec_to_cow(result)) + } + DaftImageBuffer::RGB(imgbuf) => { + let result = + image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); + DaftImageBuffer::RGB(image_buffer_vec_to_cow(result)) + } + DaftImageBuffer::RGBA(imgbuf) => { + let result = + image::imageops::crop_imm(imgbuf, bbox.0, bbox.1, bbox.2, bbox.3).to_image(); + DaftImageBuffer::RGBA(image_buffer_vec_to_cow(result)) + } + _ => unimplemented!("Mode {self:?} not implemented"), + } + } + + pub fn into_mode(self, mode: ImageMode) -> Self { + let img: DynamicImage = self.into(); + // I couldn't find a method from the image crate to do this + let img: DynamicImage = match mode { + ImageMode::L => img.into_luma8().into(), + ImageMode::LA => img.into_luma_alpha8().into(), + ImageMode::RGB => img.into_rgb8().into(), + ImageMode::RGBA => img.into_rgba8().into(), + ImageMode::L16 => img.into_luma16().into(), + ImageMode::LA16 => img.into_luma_alpha16().into(), + ImageMode::RGB16 => img.into_rgb16().into(), + ImageMode::RGBA16 => img.into_rgba16().into(), + ImageMode::RGB32F => img.into_rgb32f().into(), + ImageMode::RGBA32F => img.into_rgba32f().into(), + }; + img.into() + } +} + +fn image_buffer_vec_to_cow<'a, P, T>(input: ImageBuffer>) -> ImageBuffer> +where + P: image::Pixel, + Vec: Deref, + T: ToOwned + std::clone::Clone, + [T]: ToOwned, +{ + let h = input.height(); + let w = input.width(); + let owned: Cow<[T]> = input.into_raw().into(); + ImageBuffer::from_raw(w, h, owned).unwrap() +} + +fn image_buffer_cow_to_vec(input: ImageBuffer>) -> ImageBuffer> +where + P: image::Pixel, + Vec: Deref, + T: ToOwned + std::clone::Clone, + [T]: ToOwned, +{ + let h = input.height(); + let w = input.width(); + let owned: Vec = input.into_raw().to_vec(); + ImageBuffer::from_raw(w, h, owned).unwrap() +} + +impl<'a> From for DaftImageBuffer<'a> { + fn from(dyn_img: DynamicImage) -> Self { + match dyn_img { + DynamicImage::ImageLuma8(img_buf) => { + DaftImageBuffer::<'a>::L(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageLumaA8(img_buf) => { + DaftImageBuffer::<'a>::LA(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgb8(img_buf) => { + DaftImageBuffer::<'a>::RGB(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgba8(img_buf) => { + DaftImageBuffer::<'a>::RGBA(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageLuma16(img_buf) => { + DaftImageBuffer::<'a>::L16(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageLumaA16(img_buf) => { + DaftImageBuffer::<'a>::LA16(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgb16(img_buf) => { + DaftImageBuffer::<'a>::RGB16(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgba16(img_buf) => { + DaftImageBuffer::<'a>::RGBA16(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgb32F(img_buf) => { + DaftImageBuffer::<'a>::RGB32F(image_buffer_vec_to_cow(img_buf)) + } + DynamicImage::ImageRgba32F(img_buf) => { + DaftImageBuffer::<'a>::RGBA32F(image_buffer_vec_to_cow(img_buf)) + } + _ => unimplemented!("{dyn_img:?} not implemented"), + } + } +} + +impl<'a> From> for DynamicImage { + fn from(daft_buf: DaftImageBuffer<'a>) -> Self { + match daft_buf { + DaftImageBuffer::L(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::LA(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGB(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGBA(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::L16(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::LA16(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGB16(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGBA16(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGB32F(buf) => image_buffer_cow_to_vec(buf).into(), + DaftImageBuffer::RGBA32F(buf) => image_buffer_cow_to_vec(buf).into(), + } + } +} + +fn convert_img_fmt(fmt: ImageFormat) -> image::ImageFormat { + match fmt { + ImageFormat::PNG => image::ImageFormat::Png, + ImageFormat::JPEG => image::ImageFormat::Jpeg, + ImageFormat::TIFF => image::ImageFormat::Tiff, + ImageFormat::GIF => image::ImageFormat::Gif, + ImageFormat::BMP => image::ImageFormat::Bmp, + } +} diff --git a/src/daft-image/src/iters.rs b/src/daft-image/src/iters.rs new file mode 100644 index 0000000000..7a8183e655 --- /dev/null +++ b/src/daft-image/src/iters.rs @@ -0,0 +1,38 @@ +use crate::{ops::AsImageObj, DaftImageBuffer}; + +pub struct ImageBufferIter<'a, Arr> +where + Arr: AsImageObj, +{ + cursor: usize, + image_array: &'a Arr, +} + +impl<'a, Arr> ImageBufferIter<'a, Arr> +where + Arr: AsImageObj, +{ + pub fn new(image_array: &'a Arr) -> Self { + Self { + cursor: 0usize, + image_array, + } + } +} + +impl<'a, Arr> Iterator for ImageBufferIter<'a, Arr> +where + Arr: AsImageObj, +{ + type Item = Option>; + + fn next(&mut self) -> Option { + if self.cursor >= self.image_array.len() { + None + } else { + let image_obj = self.image_array.as_image_obj(self.cursor); + self.cursor += 1; + Some(image_obj) + } + } +} diff --git a/src/daft-image/src/lib.rs b/src/daft-image/src/lib.rs new file mode 100644 index 0000000000..d5dbcfadd9 --- /dev/null +++ b/src/daft-image/src/lib.rs @@ -0,0 +1,10 @@ +mod counting_writer; +mod image_buffer; +mod iters; +pub mod ops; +use counting_writer::CountingWriter; +use image_buffer::DaftImageBuffer; +pub mod series; + +#[cfg(feature = "python")] +pub mod python; diff --git a/src/daft-image/src/ops.rs b/src/daft-image/src/ops.rs new file mode 100644 index 0000000000..c949df9463 --- /dev/null +++ b/src/daft-image/src/ops.rs @@ -0,0 +1,487 @@ +use crate::{iters::*, CountingWriter, DaftImageBuffer}; +use base64::Engine; +use common_error::{DaftError, DaftResult}; +use daft_core::array::image_array::{BBox, ImageArraySidecarData}; +use daft_core::array::prelude::*; +use daft_core::datatypes::prelude::*; +use daft_core::prelude::ImageArray; +use num_traits::FromPrimitive; +use std::borrow::Cow; +use std::sync::Arc; + +#[allow(clippy::len_without_is_empty)] +pub trait AsImageObj { + fn name(&self) -> &str; + fn len(&self) -> usize; + fn as_image_obj(&self, idx: usize) -> Option>; +} + +pub trait ImageOps { + fn encode(&self, image_format: ImageFormat) -> DaftResult; + fn resize(&self, w: u32, h: u32) -> DaftResult + where + Self: Sized; + fn crop(&self, bboxes: &FixedSizeListArray) -> DaftResult + where + Self: Sized; + fn resize_to_fixed_shape_image_array( + &self, + w: u32, + h: u32, + mode: &ImageMode, + ) -> DaftResult; + fn to_mode(&self, mode: ImageMode) -> DaftResult + where + Self: Sized; +} + +pub(crate) fn image_array_from_img_buffers( + name: &str, + inputs: &[Option>], + image_mode: &Option, +) -> DaftResult { + use DaftImageBuffer::*; + let is_all_u8 = inputs + .iter() + .filter_map(|b| b.as_ref()) + .all(|b| matches!(b, L(..) | LA(..) | RGB(..) | RGBA(..))); + assert!(is_all_u8); + + let mut data_ref = Vec::with_capacity(inputs.len()); + let mut heights = Vec::with_capacity(inputs.len()); + let mut channels = Vec::with_capacity(inputs.len()); + let mut modes = Vec::with_capacity(inputs.len()); + let mut widths = Vec::with_capacity(inputs.len()); + let mut offsets = Vec::with_capacity(inputs.len() + 1); + offsets.push(0i64); + let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(inputs.len()); + + for ib in inputs { + validity.push(ib.is_some()); + let (height, width, mode, buffer) = match ib { + Some(ib) => (ib.height(), ib.width(), ib.mode(), ib.as_u8_slice()), + None => (0u32, 0u32, ImageMode::L, &[] as &[u8]), + }; + heights.push(height); + widths.push(width); + modes.push(mode as u8); + channels.push(mode.num_channels()); + data_ref.push(buffer); + offsets.push(offsets.last().unwrap() + buffer.len() as i64); + } + + let data = data_ref.concat(); + let validity: Option = match validity.unset_bits() { + 0 => None, + _ => Some(validity.into()), + }; + ImageArray::from_vecs( + name, + DataType::Image(*image_mode), + data, + offsets, + ImageArraySidecarData { + channels, + heights, + widths, + modes, + validity, + }, + ) +} + +pub(crate) fn fixed_image_array_from_img_buffers( + name: &str, + inputs: &[Option>], + image_mode: &ImageMode, + height: u32, + width: u32, +) -> DaftResult { + use DaftImageBuffer::*; + let is_all_u8 = inputs + .iter() + .filter_map(|b| b.as_ref()) + .all(|b| matches!(b, L(..) | LA(..) | RGB(..) | RGBA(..))); + assert!(is_all_u8); + + let num_channels = image_mode.num_channels(); + let mut data_ref = Vec::with_capacity(inputs.len()); + let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(inputs.len()); + let list_size = (height * width * num_channels as u32) as usize; + let null_list = vec![0u8; list_size]; + for ib in inputs.iter() { + validity.push(ib.is_some()); + let buffer = match ib { + Some(ib) => ib.as_u8_slice(), + None => null_list.as_slice(), + }; + data_ref.push(buffer) + } + let data = data_ref.concat(); + let validity: Option = match validity.unset_bits() { + 0 => None, + _ => Some(validity.into()), + }; + + let arrow_dtype = arrow2::datatypes::DataType::FixedSizeList( + Box::new(arrow2::datatypes::Field::new( + "data", + arrow2::datatypes::DataType::UInt8, + true, + )), + list_size, + ); + let arrow_array = Box::new(arrow2::array::FixedSizeListArray::new( + arrow_dtype.clone(), + Box::new(arrow2::array::PrimitiveArray::from_vec(data)), + validity, + )); + let physical_array = FixedSizeListArray::from_arrow( + Arc::new(Field::new(name, (&arrow_dtype).into())), + arrow_array, + )?; + let logical_dtype = DataType::FixedShapeImage(*image_mode, height, width); + Ok(FixedShapeImageArray::new( + Field::new(name, logical_dtype), + physical_array, + )) +} + +impl ImageOps for ImageArray { + fn encode(&self, image_format: ImageFormat) -> DaftResult { + encode_images(self, image_format) + } + + fn resize(&self, w: u32, h: u32) -> DaftResult { + let result = resize_images(self, w, h); + image_array_from_img_buffers(self.name(), result.as_slice(), self.image_mode()) + } + + fn crop(&self, bboxes: &FixedSizeListArray) -> DaftResult { + let mut bboxes_iterator: Box>> = if bboxes.len() == 1 { + Box::new(std::iter::repeat(bboxes.get(0).map(|bbox| { + BBox::from_u32_arrow_array(bbox.u32().unwrap().data()) + }))) + } else { + Box::new((0..bboxes.len()).map(|i| { + bboxes + .get(i) + .map(|bbox| BBox::from_u32_arrow_array(bbox.u32().unwrap().data())) + })) + }; + let result = crop_images(self, &mut bboxes_iterator); + image_array_from_img_buffers(self.name(), result.as_slice(), self.image_mode()) + } + + fn resize_to_fixed_shape_image_array( + &self, + w: u32, + h: u32, + mode: &ImageMode, + ) -> DaftResult { + let result = resize_images(self, w, h); + fixed_image_array_from_img_buffers(self.name(), result.as_slice(), mode, h, w) + } + + fn to_mode(&self, mode: ImageMode) -> DaftResult { + let buffers: Vec> = ImageBufferIter::new(self) + .map(|img| img.map(|img| img.into_mode(mode))) + .collect(); + image_array_from_img_buffers(self.name(), &buffers, &Some(mode)) + } +} + +impl ImageOps for FixedShapeImageArray { + fn encode(&self, image_format: ImageFormat) -> DaftResult { + encode_images(self, image_format) + } + + fn resize(&self, w: u32, h: u32) -> DaftResult + where + Self: Sized, + { + let result = resize_images(self, w, h); + let mode = self.image_mode(); + fixed_image_array_from_img_buffers(self.name(), result.as_slice(), mode, h, w) + } + + fn crop(&self, bboxes: &FixedSizeListArray) -> DaftResult + where + Self: Sized, + { + let mut bboxes_iterator: Box>> = if bboxes.len() == 1 { + Box::new(std::iter::repeat(bboxes.get(0).map(|bbox| { + BBox::from_u32_arrow_array(bbox.u32().unwrap().data()) + }))) + } else { + Box::new((0..bboxes.len()).map(|i| { + bboxes + .get(i) + .map(|bbox| BBox::from_u32_arrow_array(bbox.u32().unwrap().data())) + })) + }; + let result = crop_images(self, &mut bboxes_iterator); + + image_array_from_img_buffers(self.name(), result.as_slice(), &Some(*self.image_mode())) + } + + fn resize_to_fixed_shape_image_array( + &self, + w: u32, + h: u32, + mode: &ImageMode, + ) -> DaftResult { + let result = resize_images(self, w, h); + fixed_image_array_from_img_buffers(self.name(), result.as_slice(), mode, h, w) + } + + fn to_mode(&self, mode: ImageMode) -> DaftResult + where + Self: Sized, + { + let buffers: Vec> = ImageBufferIter::new(self) + .map(|img| img.map(|img| img.into_mode(mode))) + .collect(); + + let (height, width) = match self.data_type() { + DataType::FixedShapeImage(_, h, w) => (h, w), + _ => unreachable!("self should always be a FixedShapeImage"), + }; + fixed_image_array_from_img_buffers(self.name(), &buffers, &mode, *height, *width) + } +} + +impl AsImageObj for ImageArray { + fn len(&self) -> usize { + ImageArray::len(self) + } + + fn name(&self) -> &str { + ImageArray::name(self) + } + + fn as_image_obj<'a>(&'a self, idx: usize) -> Option> { + assert!(idx < self.len()); + if !self.physical.is_valid(idx) { + return None; + } + + let da = self.data_array(); + let ca = self.channel_array(); + let ha = self.height_array(); + let wa = self.width_array(); + let ma = self.mode_array(); + + let offsets = da.offsets(); + + let start = *offsets.get(idx).unwrap() as usize; + let end = *offsets.get(idx + 1).unwrap() as usize; + + let values = da + .flat_child + .u8() + .unwrap() + .data() + .as_any() + .downcast_ref::() + .unwrap(); + let slice_data = Cow::Borrowed(&values.values().as_slice()[start..end] as &'a [u8]); + + let c = ca.value(idx); + let h = ha.value(idx); + let w = wa.value(idx); + let m: ImageMode = ImageMode::from_u8(ma.value(idx)).unwrap(); + assert_eq!(m.num_channels(), c); + let result = DaftImageBuffer::from_raw(&m, w, h, slice_data); + + assert_eq!(result.height(), h); + assert_eq!(result.width(), w); + Some(result) + } +} + +impl AsImageObj for FixedShapeImageArray { + fn len(&self) -> usize { + FixedShapeImageArray::len(self) + } + + fn name(&self) -> &str { + FixedShapeImageArray::name(self) + } + + fn as_image_obj<'a>(&'a self, idx: usize) -> Option> { + assert!(idx < self.len()); + if !self.physical.is_valid(idx) { + return None; + } + + match self.data_type() { + DataType::FixedShapeImage(mode, height, width) => { + let arrow_array = self.physical.flat_child.downcast::().unwrap().as_arrow(); + let num_channels = mode.num_channels(); + let size = height * width * num_channels as u32; + let start = idx * size as usize; + let end = (idx + 1) * size as usize; + let slice_data = Cow::Borrowed(&arrow_array.values().as_slice()[start..end] as &'a [u8]); + let result = DaftImageBuffer::from_raw(mode, *width, *height, slice_data); + + assert_eq!(result.height(), *height); + assert_eq!(result.width(), *width); + Some(result) + } + dt => panic!("FixedShapeImageArray should always have DataType::FixedShapeImage() as it's dtype, but got {}", dt), + } + } +} + +fn encode_images( + images: &Arr, + image_format: ImageFormat, +) -> DaftResult { + let arrow_array = match image_format { + ImageFormat::TIFF => { + // NOTE: A single writer/buffer can't be used for TIFF files because the encoder will overwrite the + // IFD offset for the first image instead of writing it for all subsequent images, producing corrupted + // TIFF files. We work around this by writing out a new buffer for each image. + // TODO(Clark): Fix this in the tiff crate. + let values = ImageBufferIter::new(images) + .map(|img| { + img.map(|img| { + let buf = Vec::new(); + let mut writer: CountingWriter> = + std::io::BufWriter::new(std::io::Cursor::new(buf)).into(); + img.encode(image_format, &mut writer)?; + // NOTE: BufWriter::into_inner() will flush the buffer. + Ok(writer + .into_inner() + .into_inner() + .map_err(|e| { + DaftError::ValueError(format!( + "Encoding image into file format {} failed: {}", + image_format, e + )) + })? + .into_inner()) + }) + .transpose() + }) + .collect::>>()?; + arrow2::array::BinaryArray::::from_iter(values) + } + _ => { + let mut offsets = Vec::with_capacity(images.len() + 1); + offsets.push(0i64); + let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(images.len()); + let buf = Vec::new(); + let mut writer: CountingWriter> = + std::io::BufWriter::new(std::io::Cursor::new(buf)).into(); + ImageBufferIter::new(images) + .map(|img| { + match img { + Some(img) => { + img.encode(image_format, &mut writer)?; + offsets.push(writer.count() as i64); + validity.push(true); + } + None => { + offsets.push(*offsets.last().unwrap()); + validity.push(false); + } + } + Ok(()) + }) + .collect::>>()?; + // NOTE: BufWriter::into_inner() will flush the buffer. + let values = writer + .into_inner() + .into_inner() + .map_err(|e| { + DaftError::ValueError(format!( + "Encoding image into file format {} failed: {}", + image_format, e + )) + })? + .into_inner(); + let encoded_data: arrow2::buffer::Buffer = values.into(); + let offsets_buffer = arrow2::offset::OffsetsBuffer::try_from(offsets)?; + let validity: Option = match validity.unset_bits() { + 0 => None, + _ => Some(validity.into()), + }; + arrow2::array::BinaryArray::::new( + arrow2::datatypes::DataType::LargeBinary, + offsets_buffer, + encoded_data, + validity, + ) + } + }; + BinaryArray::new( + Field::new(images.name(), arrow_array.data_type().into()).into(), + arrow_array.boxed(), + ) +} + +fn resize_images(images: &Arr, w: u32, h: u32) -> Vec> { + ImageBufferIter::new(images) + .map(|img| img.map(|img| img.resize(w, h))) + .collect::>() +} + +fn crop_images<'a, Arr>( + images: &'a Arr, + bboxes: &mut dyn Iterator>, +) -> Vec>> +where + Arr: AsImageObj, +{ + ImageBufferIter::new(images) + .zip(bboxes) + .map(|(img, bbox)| match (img, bbox) { + (None, _) | (_, None) => None, + (Some(img), Some(bbox)) => Some(img.crop(&bbox)), + }) + .collect::>() +} + +pub fn image_html_value(arr: &ImageArray, idx: usize) -> String { + let maybe_image = arr.as_image_obj(idx); + let str_val = arr.str_value(idx).unwrap(); + + match maybe_image { + None => "None".to_string(), + Some(image) => { + let thumb = image.fit_to(128, 128); + let mut bytes: Vec = vec![]; + let mut writer = std::io::BufWriter::new(std::io::Cursor::new(&mut bytes)); + thumb.encode(ImageFormat::JPEG, &mut writer).unwrap(); + drop(writer); + format!( + "\"{}\"", + base64::engine::general_purpose::STANDARD.encode(&mut bytes), + str_val, + ) + } + } +} + +pub fn fixed_image_html_value(arr: &FixedShapeImageArray, idx: usize) -> String { + let maybe_image = arr.as_image_obj(idx); + let str_val = arr.str_value(idx).unwrap(); + + match maybe_image { + None => "None".to_string(), + Some(image) => { + let thumb = image.fit_to(128, 128); + let mut bytes: Vec = vec![]; + let mut writer = std::io::BufWriter::new(std::io::Cursor::new(&mut bytes)); + thumb.encode(ImageFormat::JPEG, &mut writer).unwrap(); + drop(writer); + format!( + "\"{}\"", + base64::engine::general_purpose::STANDARD.encode(&mut bytes), + str_val, + ) + } + } +} diff --git a/src/daft-image/src/python.rs b/src/daft-image/src/python.rs new file mode 100644 index 0000000000..99b93dd265 --- /dev/null +++ b/src/daft-image/src/python.rs @@ -0,0 +1,53 @@ +use daft_core::{ + prelude::{ImageFormat, ImageMode}, + python::PySeries, +}; +use pyo3::{exceptions::PyValueError, prelude::*}; + +#[pyfunction] +pub fn decode( + s: &PySeries, + raise_error_on_failure: bool, + mode: Option, +) -> PyResult { + let s = crate::series::decode(&s.series, raise_error_on_failure, mode)?; + Ok(s.into()) +} + +#[pyfunction] +pub fn encode(s: &PySeries, image_format: ImageFormat) -> PyResult { + let s = crate::series::encode(&s.series, image_format)?; + Ok(s.into()) +} + +#[pyfunction] +pub fn resize(s: &PySeries, w: i64, h: i64) -> PyResult { + if w < 0 { + return Err(PyValueError::new_err(format!( + "width can not be negative: {w}" + ))); + } + if h < 0 { + return Err(PyValueError::new_err(format!( + "height can not be negative: {h}" + ))); + } + let s = crate::series::resize(&s.series, w as u32, h as u32)?; + Ok(s.into()) +} + +#[pyfunction] +pub fn to_mode(s: &PySeries, mode: &ImageMode) -> PyResult { + let s = crate::series::to_mode(&s.series, *mode)?; + Ok(s.into()) +} + +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + let module = PyModule::new(_py, "image")?; + module.add_wrapped(wrap_pyfunction!(decode))?; + module.add_wrapped(wrap_pyfunction!(encode))?; + module.add_wrapped(wrap_pyfunction!(resize))?; + module.add_wrapped(wrap_pyfunction!(to_mode))?; + parent.add_submodule(module)?; + Ok(()) +} diff --git a/src/daft-image/src/series.rs b/src/daft-image/src/series.rs new file mode 100644 index 0000000000..7fe2255b7e --- /dev/null +++ b/src/daft-image/src/series.rs @@ -0,0 +1,204 @@ +use daft_core::prelude::*; + +use common_error::{DaftError, DaftResult}; + +use crate::{ + ops::{image_array_from_img_buffers, ImageOps}, + DaftImageBuffer, +}; +fn image_decode_impl( + ba: &BinaryArray, + raise_error_on_failure: bool, + mode: Option, +) -> DaftResult { + let arrow_array = ba + .data() + .as_any() + .downcast_ref::>() + .unwrap(); + let mut img_bufs = Vec::>::with_capacity(arrow_array.len()); + let mut cached_dtype: Option = None; + // Load images from binary buffers. + // Confirm that all images have the same value dtype. + for (index, row) in arrow_array.iter().enumerate() { + let mut img_buf = match row.map(DaftImageBuffer::decode).transpose() { + Ok(val) => val, + Err(err) => { + if raise_error_on_failure { + return Err(err); + } else { + log::warn!( + "Error occurred during image decoding at index: {index} {} (falling back to Null)", + err + ); + None + } + } + }; + if let Some(mode) = mode { + img_buf = img_buf.map(|buf| buf.into_mode(mode)); + } + let dtype = img_buf.as_ref().map(|im| im.mode().get_dtype()); + match (dtype.as_ref(), cached_dtype.as_ref()) { + (Some(t1), Some(t2)) => { + if t1 != t2 { + return Err(DaftError::ValueError(format!( + "All images in a column must have the same dtype, but got: {:?} and {:?}", + t1, t2 + ))); + } + } + (Some(t1), None) => { + cached_dtype = Some(t1.clone()); + } + (None, _) => {} + } + img_bufs.push(img_buf); + } + // Fall back to UInt8 dtype if series is all nulls. + let cached_dtype = cached_dtype.unwrap_or(DataType::UInt8); + match cached_dtype { + DataType::UInt8 => Ok(image_array_from_img_buffers(ba.name(), img_bufs.as_slice(), &mode)?), + _ => unimplemented!("Decoding images of dtype {cached_dtype:?} is not supported, only uint8 images are supported."), + } +} + +/// Decodes a series of binary data into image arrays. +/// +/// # Arguments +/// * `s` - Input Series containing binary image data +/// * `raise_error_on_failure` - If true, raises errors on decode failures +/// * `mode` - Optional target ImageMode for decoded images +/// +/// # Returns +/// A DaftResult containing a Series of decoded images +pub fn decode( + s: &Series, + raise_error_on_failure: bool, + mode: Option, +) -> DaftResult { + match s.data_type() { + DataType::Binary => image_decode_impl(s.binary()?, raise_error_on_failure, mode) + .map(|arr| arr.into_series()), + dtype => Err(DaftError::ValueError(format!( + "Decoding in-memory data into images is only supported for binary arrays, but got {}", + dtype + ))), + } +} + +/// Encode a series of images into a series of bytes +/// Encode a series of images into a series of bytes. +/// +/// This function takes a Series containing image data and an ImageFormat, +/// then encodes each image into the specified format. +/// +/// # Arguments +/// * `s` - The input Series containing image data +/// * `image_format` - The desired output format for the encoded images +/// +/// # Returns +/// A DaftResult containing a new Series of encoded binary data +pub fn encode(s: &Series, image_format: ImageFormat) -> DaftResult { + match s.data_type() { + DataType::Image(..) => Ok(s + .downcast::()? + .encode(image_format)? + .into_series()), + DataType::FixedShapeImage(..) => Ok(s + .downcast::()? + .encode(image_format)? + .into_series()), + dtype => Err(DaftError::ValueError(format!( + "Encoding images into bytes is only supported for image arrays, but got {}", + dtype + ))), + } +} +/// Resizes images in a Series to the specified width and height. +/// +/// # Arguments +/// * `s` - Input Series containing image data +/// * `w` - Target width for resized images +/// * `h` - Target height for resized images +/// +/// # Returns +/// A DaftResult containing a new Series with resized images +pub fn resize(s: &Series, w: u32, h: u32) -> DaftResult { + match s.data_type() { + DataType::Image(mode) => { + let array = s.downcast::()?; + match mode { + // If the image mode is specified at the type-level (and is therefore guaranteed to be consistent + // across all images across all partitions), store the resized image in a fixed shape image array, + // since we'll have homogeneous modes, heights, and widths after resizing. + Some(mode) => Ok(array + .resize_to_fixed_shape_image_array(w, h, mode)? + .into_series()), + None => Ok(array.resize(w, h)?.into_series()), + } + } + DataType::FixedShapeImage(..) => Ok(s + .downcast::()? + .resize(w, h)? + .into_series()), + _ => Err(DaftError::ValueError(format!( + "datatype: {} does not support Image Resize. Occurred while resizing Series: {}", + s.data_type(), + s.name() + ))), + } +} + +/// Crops images in a Series based on provided bounding boxes. +/// +/// # Arguments +/// * `s` - Input Series containing image data +/// * `bbox` - Series of bounding boxes for cropping +/// +/// # Returns +/// A DaftResult containing a new Series with cropped images +pub fn crop(s: &Series, bbox: &Series) -> DaftResult { + let bbox_type = DataType::FixedSizeList(Box::new(DataType::UInt32), 4); + let bbox = bbox.cast(&bbox_type)?; + let bbox = bbox.fixed_size_list()?; + + match &s.data_type() { + DataType::Image(_) => s + .downcast::()? + .crop(bbox) + .map(|arr| arr.into_series()), + DataType::FixedShapeImage(..) => s + .fixed_size_image()? + .crop(bbox) + .map(|arr| arr.into_series()), + dt => Err(DaftError::ValueError(format!( + "Expected input to crop to be an Image type, but received: {}", + dt + ))), + } +} +/// Converts images in a Series to the specified mode. +/// +/// # Arguments +/// * `s` - Input Series containing image data +/// * `mode` - Target ImageMode for conversion +/// +/// # Returns +/// A DaftResult containing a new Series with converted images +pub fn to_mode(s: &Series, mode: ImageMode) -> DaftResult { + match &s.data_type() { + DataType::Image(_) => s + .downcast::()? + .to_mode(mode) + .map(|arr| arr.into_series()), + DataType::FixedShapeImage(..) => s + .fixed_size_image()? + .to_mode(mode) + .map(|arr| arr.into_series()), + dt => Err(DaftError::ValueError(format!( + "Expected input to crop to be an Image type, but received: {}", + dt + ))), + } +} diff --git a/src/daft-table/Cargo.toml b/src/daft-table/Cargo.toml index 0b42463974..02326764e3 100644 --- a/src/daft-table/Cargo.toml +++ b/src/daft-table/Cargo.toml @@ -6,6 +6,7 @@ common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-image = {path = "../daft-image", default-features = false} html-escape = {workspace = true} num-traits = {workspace = true} pyo3 = {workspace = true, optional = true} @@ -13,7 +14,7 @@ rand = {workspace = true} serde = {workspace = true} [features] -python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "common-arrow-ffi/python", "common-display/python"] +python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "common-arrow-ffi/python", "common-display/python", "daft-image/python"] [package] edition = {workspace = true} diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 02052109a6..d8c4ba4fca 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -23,6 +23,7 @@ pub mod ffi; mod growable; mod ops; mod probe_table; +mod repr_html; pub use growable::GrowableTable; @@ -32,6 +33,7 @@ pub use probe_table::{ProbeTable, ProbeTableBuilder}; pub mod python; #[cfg(feature = "python")] pub use python::register_modules; +use repr_html::html_value; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Table { @@ -713,7 +715,7 @@ impl Table { for col in self.columns.iter() { res.push_str(styled_td); - res.push_str(&col.html_value(i)); + res.push_str(&html_value(col, i)); res.push_str(""); } @@ -735,7 +737,7 @@ impl Table { for col in self.columns.iter() { res.push_str(styled_td); - res.push_str(&col.html_value(i)); + res.push_str(&html_value(col, i)); res.push_str(""); } diff --git a/src/daft-table/src/repr_html.rs b/src/daft-table/src/repr_html.rs new file mode 100644 index 0000000000..aaabc6efa8 --- /dev/null +++ b/src/daft-table/src/repr_html.rs @@ -0,0 +1,139 @@ +use daft_core::datatypes::ExtensionArray; +use daft_core::{prelude::DataType, series::Series}; + +pub fn html_value(s: &Series, idx: usize) -> String { + match s.data_type() { + DataType::Image(_) => { + let arr = s.image().unwrap(); + daft_image::ops::image_html_value(arr, idx) + } + DataType::Null => { + let arr = s.null().unwrap(); + arr.html_value(idx) + } + DataType::Boolean => { + let arr = s.bool().unwrap(); + arr.html_value(idx) + } + DataType::Int8 => { + let arr = s.i8().unwrap(); + arr.html_value(idx) + } + DataType::Int16 => { + let arr = s.i16().unwrap(); + arr.html_value(idx) + } + DataType::Int32 => { + let arr = s.i32().unwrap(); + arr.html_value(idx) + } + DataType::Int64 => { + let arr = s.i64().unwrap(); + arr.html_value(idx) + } + DataType::Int128 => { + let arr = s.i128().unwrap(); + arr.html_value(idx) + } + DataType::UInt8 => { + let arr = s.u8().unwrap(); + arr.html_value(idx) + } + DataType::UInt16 => { + let arr = s.u16().unwrap(); + arr.html_value(idx) + } + DataType::UInt32 => { + let arr = s.u32().unwrap(); + arr.html_value(idx) + } + DataType::UInt64 => { + let arr = s.u64().unwrap(); + arr.html_value(idx) + } + DataType::Float32 => { + let arr = s.f32().unwrap(); + arr.html_value(idx) + } + DataType::Float64 => { + let arr = s.f64().unwrap(); + arr.html_value(idx) + } + DataType::Decimal128(_, _) => { + let arr = s.decimal128().unwrap(); + arr.html_value(idx) + } + DataType::Timestamp(_, _) => { + let arr = s.timestamp().unwrap(); + arr.html_value(idx) + } + DataType::Date => { + let arr = s.date().unwrap(); + arr.html_value(idx) + } + DataType::Time(_) => { + let arr = s.time().unwrap(); + arr.html_value(idx) + } + DataType::Duration(_) => { + let arr = s.duration().unwrap(); + arr.html_value(idx) + } + DataType::Binary => { + let arr = s.binary().unwrap(); + arr.html_value(idx) + } + DataType::FixedSizeBinary(_) => { + let arr = s.fixed_size_binary().unwrap(); + arr.html_value(idx) + } + DataType::Utf8 => { + let arr = s.utf8().unwrap(); + arr.html_value(idx) + } + DataType::FixedSizeList(_, _) => { + let arr = s.fixed_size_list().unwrap(); + arr.html_value(idx) + } + DataType::List(_) => { + let arr = s.list().unwrap(); + arr.html_value(idx) + } + DataType::Struct(_) => { + let arr = s.struct_().unwrap(); + arr.html_value(idx) + } + DataType::Map(_) => { + let arr = s.map().unwrap(); + arr.html_value(idx) + } + DataType::Extension(_, _, _) => { + let arr = s.downcast::().unwrap(); + arr.html_value(idx) + } + DataType::Embedding(_, _) => { + let arr = s.embedding().unwrap(); + arr.html_value(idx) + } + DataType::FixedShapeImage(_, _, _) => { + let arr = s.fixed_size_image().unwrap(); + daft_image::ops::fixed_image_html_value(arr, idx) + } + DataType::Tensor(_) => { + let arr = s.tensor().unwrap(); + arr.html_value(idx) + } + DataType::FixedShapeTensor(_, _) => { + let arr = s.fixed_shape_tensor().unwrap(); + arr.html_value(idx) + } + #[cfg(feature = "python")] + DataType::Python => { + let arr = s.python().unwrap(); + arr.html_value(idx) + } + DataType::Unknown => { + panic!("Unknown data type") + } + } +} diff --git a/src/lib.rs b/src/lib.rs index b476e1b251..79816562f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,33 +94,34 @@ pub mod pylib { } #[pymodule] - fn daft(_py: Python<'_>, m: &PyModule) -> PyResult<()> { - refresh_logger(_py)?; + fn daft(py: Python<'_>, m: &PyModule) -> PyResult<()> { + refresh_logger(py)?; init_tracing(crate::should_enable_chrome_trace()); - common_daft_config::register_modules(_py, m)?; - common_system_info::register_modules(_py, m)?; - common_resource_request::register_modules(_py, m)?; - common_file_formats::python::register_modules(_py, m)?; - daft_core::register_modules(_py, m)?; - daft_core::python::register_modules(_py, m)?; - daft_local_execution::register_modules(_py, m)?; - daft_dsl::register_modules(_py, m)?; - daft_table::register_modules(_py, m)?; - daft_io::register_modules(_py, m)?; - daft_parquet::register_modules(_py, m)?; - daft_csv::register_modules(_py, m)?; - daft_json::register_modules(_py, m)?; - daft_plan::register_modules(_py, m)?; - daft_micropartition::register_modules(_py, m)?; - daft_scan::register_modules(_py, m)?; - daft_scheduler::register_modules(_py, m)?; - daft_sql::register_modules(_py, m)?; - daft_functions::register_modules(_py, m)?; + common_daft_config::register_modules(py, m)?; + common_system_info::register_modules(py, m)?; + common_resource_request::register_modules(py, m)?; + common_file_formats::python::register_modules(py, m)?; + daft_core::register_modules(py, m)?; + daft_core::python::register_modules(py, m)?; + daft_local_execution::register_modules(py, m)?; + daft_dsl::register_modules(py, m)?; + daft_table::register_modules(py, m)?; + daft_io::register_modules(py, m)?; + daft_parquet::register_modules(py, m)?; + daft_csv::register_modules(py, m)?; + daft_json::register_modules(py, m)?; + daft_plan::register_modules(py, m)?; + daft_micropartition::register_modules(py, m)?; + daft_scan::register_modules(py, m)?; + daft_scheduler::register_modules(py, m)?; + daft_sql::register_modules(py, m)?; + daft_functions::register_modules(py, m)?; m.add_wrapped(wrap_pyfunction!(version))?; m.add_wrapped(wrap_pyfunction!(build_type))?; m.add_wrapped(wrap_pyfunction!(refresh_logger))?; m.add_wrapped(wrap_pyfunction!(get_max_log_level))?; + daft_image::python::register_modules(py, m)?; Ok(()) } }