Skip to content

Commit

Permalink
AVRO-4032: [Rust] Make it possible to set custom compression level fo…
Browse files Browse the repository at this point in the history
…r all codec that support it (#3095)

* AVRO-4032: add zstd variant that allows for level config

* add docs

* manual impl for IntoStaticStr to handle 2 variants with same value

* AVRO-4032: [Rust] Add codec settings for Bzip2

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-4032: [Rust] Add settings for Xz codec

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* add zstd settings

* make our new settings structs visible

* AVRO-4032: [Rust] Add support for Zstandard dictionary

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-4032: [Rust] Store the codec compression_level in the header metadata

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-4032: [Rust] Minor cleanups

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-4032: Fix the build

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-4032: Add tests for Codec::Null and ::Deflate

This way there is no need to enable other codecs when running unrelated tests.
Otherwise the build fails with:
```
error: function `avro_4032_codec_settings` is never used
  --> avro/tests/codecs.rs:50:4
   |
50 | fn avro_4032_codec_settings(codec: Codec) -> TestResult {
   |    ^^^^^^^^^^^^^^^^^^^^^^^^
```

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

---------

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
MarcoLugo and martin-g committed Aug 15, 2024
1 parent 38321e6 commit 9379fdb
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 45 deletions.
157 changes: 120 additions & 37 deletions lang/rust/avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,6 @@ use libflate::deflate::{Decoder, Encoder};
use std::io::{Read, Write};
use strum_macros::{EnumIter, EnumString, IntoStaticStr};

#[cfg(feature = "bzip")]
use bzip2::{
read::{BzDecoder, BzEncoder},
Compression,
};
#[cfg(feature = "snappy")]
extern crate crc32fast;
#[cfg(feature = "snappy")]
use crc32fast::Hasher;
#[cfg(feature = "xz")]
use xz2::read::{XzDecoder, XzEncoder};

/// The compression codec used to compress blocks.
#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
Expand All @@ -49,15 +37,16 @@ pub enum Codec {
/// CRC32 checksum of the uncompressed data in the block.
Snappy,
#[cfg(feature = "zstandard")]
Zstandard,
/// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/)
Zstandard(zstandard::ZstandardSettings),
#[cfg(feature = "bzip")]
/// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
/// compression library.
Bzip2,
Bzip2(bzip::Bzip2Settings),
#[cfg(feature = "xz")]
/// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
/// compression library.
Xz,
Xz(xz::XzSettings),
}

impl From<Codec> for Value {
Expand Down Expand Up @@ -87,7 +76,7 @@ impl Codec {
.compress(&stream[..], &mut encoded[..])
.map_err(Error::SnappyCompress)?;

let mut hasher = Hasher::new();
let mut hasher = crc32fast::Hasher::new();
hasher.update(&stream[..]);
let checksum = hasher.finalize();
let checksum_as_bytes = checksum.to_be_bytes();
Expand All @@ -98,22 +87,26 @@ impl Codec {
*stream = encoded;
}
#[cfg(feature = "zstandard")]
Codec::Zstandard => {
let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
Codec::Zstandard(settings) => {
let mut encoder =
zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
encoder.write_all(stream).map_err(Error::ZstdCompress)?;
*stream = encoder.finish().unwrap();
}
#[cfg(feature = "bzip")]
Codec::Bzip2 => {
let mut encoder = BzEncoder::new(&stream[..], Compression::best());
Codec::Bzip2(settings) => {
use bzip2::read::BzEncoder;

let mut encoder = BzEncoder::new(&stream[..], settings.compression());
let mut buffer = Vec::new();
encoder.read_to_end(&mut buffer).unwrap();
*stream = buffer;
}
#[cfg(feature = "xz")]
Codec::Xz => {
let compression_level = 9;
let mut encoder = XzEncoder::new(&stream[..], compression_level);
Codec::Xz(settings) => {
use xz2::read::XzEncoder;

let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
let mut buffer = Vec::new();
encoder.read_to_end(&mut buffer).unwrap();
*stream = buffer;
Expand Down Expand Up @@ -148,7 +141,7 @@ impl Codec {
last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
let expected: u32 = u32::from_be_bytes(last_four);

let mut hasher = Hasher::new();
let mut hasher = crc32fast::Hasher::new();
hasher.update(&decoded);
let actual = hasher.finalize();

Expand All @@ -158,21 +151,30 @@ impl Codec {
decoded
}
#[cfg(feature = "zstandard")]
Codec::Zstandard => {
Codec::Zstandard(_settings) => {
use std::io::BufReader;
use zstd::zstd_safe;

let mut decoded = Vec::new();
let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
let buffer_size = zstd_safe::DCtx::in_size();
let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
let mut decoder = zstd::Decoder::new(buffer).unwrap();
std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
decoded
}
#[cfg(feature = "bzip")]
Codec::Bzip2 => {
Codec::Bzip2(_) => {
use bzip2::read::BzDecoder;

let mut decoder = BzDecoder::new(&stream[..]);
let mut decoded = Vec::new();
decoder.read_to_end(&mut decoded).unwrap();
decoded
}
#[cfg(feature = "xz")]
Codec::Xz => {
Codec::Xz(_) => {
use xz2::read::XzDecoder;

let mut decoder = XzDecoder::new(&stream[..]);
let mut decoded: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decoded).unwrap();
Expand All @@ -183,6 +185,72 @@ impl Codec {
}
}

#[cfg(feature = "bzip")]
pub mod bzip {
use bzip2::Compression;

#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct Bzip2Settings {
pub compression_level: u8,
}

impl Bzip2Settings {
pub fn new(compression_level: u8) -> Self {
Self { compression_level }
}

pub(crate) fn compression(&self) -> Compression {
Compression::new(self.compression_level as u32)
}
}

impl Default for Bzip2Settings {
fn default() -> Self {
Bzip2Settings::new(Compression::best().level() as u8)
}
}
}

#[cfg(feature = "zstandard")]
pub mod zstandard {
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct ZstandardSettings {
pub compression_level: u8,
}

impl ZstandardSettings {
pub fn new(compression_level: u8) -> Self {
Self { compression_level }
}
}

impl Default for ZstandardSettings {
fn default() -> Self {
Self::new(0)
}
}
}

#[cfg(feature = "xz")]
pub mod xz {
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct XzSettings {
pub compression_level: u8,
}

impl XzSettings {
pub fn new(compression_level: u8) -> Self {
Self { compression_level }
}
}

impl Default for XzSettings {
fn default() -> Self {
XzSettings::new(9)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -216,19 +284,19 @@ mod tests {
#[cfg(feature = "zstandard")]
#[test]
fn zstd_compress_and_decompress() -> TestResult {
compress_and_decompress(Codec::Zstandard)
compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
}

#[cfg(feature = "bzip")]
#[test]
fn bzip_compress_and_decompress() -> TestResult {
compress_and_decompress(Codec::Bzip2)
compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
}

#[cfg(feature = "xz")]
#[test]
fn xz_compress_and_decompress() -> TestResult {
compress_and_decompress(Codec::Xz)
compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
}

fn compress_and_decompress(codec: Codec) -> TestResult {
Expand All @@ -250,13 +318,19 @@ mod tests {
assert_eq!(<&str>::from(Codec::Snappy), "snappy");

#[cfg(feature = "zstandard")]
assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
assert_eq!(
<&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
"zstandard"
);

#[cfg(feature = "bzip")]
assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
assert_eq!(
<&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
"bzip2"
);

#[cfg(feature = "xz")]
assert_eq!(<&str>::from(Codec::Xz), "xz");
assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
}

#[test]
Expand All @@ -270,13 +344,22 @@ mod tests {
assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);

#[cfg(feature = "zstandard")]
assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
assert_eq!(
Codec::from_str("zstandard").unwrap(),
Codec::Zstandard(zstandard::ZstandardSettings::default())
);

#[cfg(feature = "bzip")]
assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
assert_eq!(
Codec::from_str("bzip2").unwrap(),
Codec::Bzip2(bzip::Bzip2Settings::default())
);

#[cfg(feature = "xz")]
assert_eq!(Codec::from_str("xz").unwrap(), Codec::Xz);
assert_eq!(
Codec::from_str("xz").unwrap(),
Codec::Xz(xz::XzSettings::default())
);

assert!(Codec::from_str("not a codec").is_err());
}
Expand Down
6 changes: 6 additions & 0 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,12 @@ pub use crate::{
serde_avro_slice, serde_avro_slice_opt,
},
};
#[cfg(feature = "bzip")]
pub use codec::bzip::Bzip2Settings;
#[cfg(feature = "xz")]
pub use codec::xz::XzSettings;
#[cfg(feature = "zstandard")]
pub use codec::zstandard::ZstandardSettings;
pub use codec::Codec;
pub use de::from_value;
pub use decimal::Decimal;
Expand Down
50 changes: 42 additions & 8 deletions lang/rust/avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ impl<'r, R: Read> Block<'r, R> {
/// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
/// its content.
fn read_header(&mut self) -> AvroResult<()> {
let meta_schema = Schema::map(Schema::Bytes);

let mut buf = [0u8; 4];
self.reader
.read_exact(&mut buf)
Expand All @@ -86,12 +84,16 @@ impl<'r, R: Read> Block<'r, R> {
return Err(Error::HeaderMagic);
}

let meta_schema = Schema::map(Schema::Bytes);
if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
self.read_writer_schema(&metadata)?;
self.codec = read_codec(&metadata)?;

for (key, value) in metadata {
if key == "avro.schema" || key == "avro.codec" {
if key == "avro.schema"
|| key == "avro.codec"
|| key == "avro.codec.compression_level"
{
// already processed
} else if key.starts_with("avro.") {
warn!("Ignoring unknown metadata key: {}", key);
Expand Down Expand Up @@ -262,16 +264,48 @@ fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
})
.map(|codec_res| match codec_res {
Ok(codec) => match Codec::from_str(codec) {
Ok(codec) => Ok(codec),
Ok(codec) => match codec {
#[cfg(feature = "bzip")]
Codec::Bzip2(_) => {
use crate::Bzip2Settings;
if let Some(Value::Bytes(bytes)) =
metadata.get("avro.codec.compression_level")
{
Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
} else {
Ok(codec)
}
}
#[cfg(feature = "xz")]
Codec::Xz(_) => {
use crate::XzSettings;
if let Some(Value::Bytes(bytes)) =
metadata.get("avro.codec.compression_level")
{
Ok(Codec::Xz(XzSettings::new(bytes[0])))
} else {
Ok(codec)
}
}
#[cfg(feature = "zstandard")]
Codec::Zstandard(_) => {
use crate::ZstandardSettings;
if let Some(Value::Bytes(bytes)) =
metadata.get("avro.codec.compression_level")
{
Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
} else {
Ok(codec)
}
}
_ => Ok(codec),
},
Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
},
Err(err) => Err(err),
});

match result {
Some(res) => res,
None => Ok(Codec::Null),
}
result.unwrap_or_else(|| Ok(Codec::Null))
}

/// Main interface for reading Avro formatted values.
Expand Down
24 changes: 24 additions & 0 deletions lang/rust/avro/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,30 @@ impl<'a, W: Write> Writer<'a, W> {
let mut metadata = HashMap::with_capacity(2);
metadata.insert("avro.schema", Value::Bytes(schema_bytes));
metadata.insert("avro.codec", self.codec.into());
match self.codec {
#[cfg(feature = "bzip")]
Codec::Bzip2(settings) => {
metadata.insert(
"avro.codec.compression_level",
Value::Bytes(vec![settings.compression_level]),
);
}
#[cfg(feature = "xz")]
Codec::Xz(settings) => {
metadata.insert(
"avro.codec.compression_level",
Value::Bytes(vec![settings.compression_level]),
);
}
#[cfg(feature = "zstandard")]
Codec::Zstandard(settings) => {
metadata.insert(
"avro.codec.compression_level",
Value::Bytes(vec![settings.compression_level]),
);
}
_ => {}
}

for (k, v) in &self.user_metadata {
metadata.insert(k.as_str(), v.clone());
Expand Down
Loading

0 comments on commit 9379fdb

Please sign in to comment.