diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go index 1ebceaca234c1..b623bd5074d02 100644 --- a/go/parquet/file/column_reader.go +++ b/go/parquet/file/column_reader.go @@ -17,6 +17,7 @@ package file import ( + "errors" "fmt" "sync" @@ -345,6 +346,11 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error { c.curDecoder = decoder } else { switch encoding { + case format.Encoding_RLE: + if c.descr.PhysicalType() != parquet.Types.Boolean { + return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType()) + } + fallthrough case format.Encoding_PLAIN, format.Encoding_DELTA_BYTE_ARRAY, format.Encoding_DELTA_LENGTH_BYTE_ARRAY, @@ -352,7 +358,7 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error { c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem) c.decoders[encoding] = c.curDecoder case format.Encoding_RLE_DICTIONARY: - return xerrors.New("parquet: dictionary page must be before data page") + return errors.New("parquet: dictionary page must be before data page") case format.Encoding_BYTE_STREAM_SPLIT: return fmt.Errorf("parquet: unsupported data encoding %s", encoding) default: diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go index eccb572b30040..2a9b097139062 100644 --- a/go/parquet/file/file_reader_test.go +++ b/go/parquet/file/file_reader_test.go @@ -21,6 +21,8 @@ import ( "crypto/rand" "encoding/binary" "io" + "os" + "path" "testing" "github.com/apache/arrow/go/v14/arrow/memory" @@ -385,3 +387,62 @@ func TestDeltaLengthByteArrayPackingWithNulls(t *testing.T) { assert.NotNil(t, readData[0]) } } + +func TestRleBooleanEncodingFileRead(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + assert.DirExists(t, dir) + + props := parquet.NewReaderProperties(memory.DefaultAllocator) + fileReader, err := file.OpenParquetFile(path.Join(dir, "rle_boolean_encoding.parquet"), + false, file.WithReadProps(props)) + require.NoError(t, err) + defer fileReader.Close() + + assert.Equal(t, 1, fileReader.NumRowGroups()) + rgr := fileReader.RowGroup(0) + assert.EqualValues(t, 68, rgr.NumRows()) + + rdr, err := rgr.Column(0) + require.NoError(t, err) + brdr := rdr.(*file.BooleanColumnChunkReader) + + values := make([]bool, 68) + defLvls, repLvls := make([]int16, 68), make([]int16, 68) + total, read, err := brdr.ReadBatch(68, values, defLvls, repLvls) + require.NoError(t, err) + + assert.EqualValues(t, 68, total) + md, err := rgr.MetaData().ColumnChunk(0) + require.NoError(t, err) + stats, err := md.Statistics() + require.NoError(t, err) + assert.EqualValues(t, total-stats.NullCount(), read) + + expected := []bool{ + true, false, true, true, false, false, + true, true, true, false, false, true, true, + false, true, true, false, false, true, true, + false, true, true, false, false, true, true, + true, false, false, false, false, true, true, + false, true, true, false, false, true, true, + true, false, false, true, true, false, false, + true, true, true, false, true, true, false, + true, true, false, false, true, true, true, + } + expectedNulls := []int{2, 15, 23, 38, 48, 60} + + expectedNullIdx := 0 + for i, v := range defLvls { + if expectedNullIdx < len(expectedNulls) && i == expectedNulls[expectedNullIdx] { + assert.Zero(t, v) + expectedNullIdx++ + } else { + assert.EqualValues(t, 1, v) + } + } + + assert.Equal(t, expected, values[:len(expected)]) +} diff --git a/go/parquet/internal/encoding/boolean_decoder.go b/go/parquet/internal/encoding/boolean_decoder.go index dd213395d6324..337a6db967a2e 100644 --- a/go/parquet/internal/encoding/boolean_decoder.go +++ b/go/parquet/internal/encoding/boolean_decoder.go @@ -17,11 +17,16 @@ package encoding import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "github.com/apache/arrow/go/v14/arrow/bitutil" shared_utils "github.com/apache/arrow/go/v14/internal/utils" "github.com/apache/arrow/go/v14/parquet" "github.com/apache/arrow/go/v14/parquet/internal/utils" - "golang.org/x/xerrors" ) // PlainBooleanDecoder is for the Plain Encoding type, there is no @@ -103,7 +108,80 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBit return 0, err } if valuesRead != toRead { - return valuesRead, xerrors.New("parquet: boolean decoder: number of values / definition levels read did not match") + return valuesRead, errors.New("parquet: boolean decoder: number of values / definition levels read did not match") + } + return spacedExpand(out, nullCount, validBits, validBitsOffset), nil + } + return dec.Decode(out) +} + +type RleBooleanDecoder struct { + decoder + + rleDec *utils.RleDecoder +} + +func (RleBooleanDecoder) Type() parquet.Type { + return parquet.Types.Boolean +} + +func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error { + dec.nvals = nvals + + if len(data) < 4 { + return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data)) + } + + // load the first 4 bytes in little-endian which indicates the length + nbytes := binary.LittleEndian.Uint32(data[:4]) + if nbytes > uint32(len(data)-4) { + return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes) + } + + dec.data = data[4:] + if dec.rleDec == nil { + dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1) + } else { + dec.rleDec.Reset(bytes.NewReader(dec.data), 1) + } + return nil +} + +func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) { + max := shared_utils.MinInt(len(out), dec.nvals) + + var ( + buf [1024]uint64 + n = max + ) + + for n > 0 { + batch := shared_utils.MinInt(len(buf), n) + decoded := dec.rleDec.GetBatch(buf[:batch]) + if decoded != batch { + return max - n, io.ErrUnexpectedEOF + } + + for i := 0; i < batch; i++ { + out[i] = buf[i] != 0 + } + n -= batch + out = out[batch:] + } + + dec.nvals -= max + return max, nil +} + +func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + if nullCount > 0 { + toRead := len(out) - nullCount + valuesRead, err := dec.Decode(out[:toRead]) + if err != nil { + return 0, err + } + if valuesRead != toRead { + return valuesRead, errors.New("parquet: rle boolean decoder: number of values / definition levels read did not match") } return spacedExpand(out, nullCount, validBits, validBitsOffset), nil } diff --git a/go/parquet/internal/encoding/boolean_encoder.go b/go/parquet/internal/encoding/boolean_encoder.go index 65ba2658b0637..3970e05fca289 100644 --- a/go/parquet/internal/encoding/boolean_encoder.go +++ b/go/parquet/internal/encoding/boolean_encoder.go @@ -17,8 +17,11 @@ package encoding import ( + "encoding/binary" + "github.com/apache/arrow/go/v14/arrow/bitutil" "github.com/apache/arrow/go/v14/parquet" + "github.com/apache/arrow/go/v14/parquet/internal/debug" "github.com/apache/arrow/go/v14/parquet/internal/utils" ) @@ -87,3 +90,55 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) { return enc.sink.Finish(), nil } + +const rleLengthInBytes = 4 + +type RleBooleanEncoder struct { + encoder + + bufferedValues []bool +} + +func (RleBooleanEncoder) Type() parquet.Type { + return parquet.Types.Boolean +} + +func (enc *RleBooleanEncoder) Put(in []bool) { + enc.bufferedValues = append(enc.bufferedValues, in...) +} + +func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) { + bufferOut := make([]bool, len(in)) + nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset) + enc.Put(bufferOut[:nvalid]) +} + +func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 { + return rleLengthInBytes + int64(enc.maxRleBufferSize()) +} + +func (enc *RleBooleanEncoder) maxRleBufferSize() int { + return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) + + utils.MinRLEBufferSize(1) +} + +func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) { + rleBufferSizeMax := enc.maxRleBufferSize() + enc.sink.SetOffset(rleLengthInBytes) + enc.sink.Reserve(rleBufferSizeMax) + + rleEncoder := utils.NewRleEncoder(enc.sink, 1) + for _, v := range enc.bufferedValues { + if v { + rleEncoder.Put(1) + } else { + rleEncoder.Put(0) + } + } + n := rleEncoder.Flush() + debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than expected max") + buf := enc.sink.Finish() + binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n)) + + return buf, nil +} diff --git a/go/parquet/internal/encoding/encoder.go b/go/parquet/internal/encoding/encoder.go index 9626e4e9ff97f..f6b57fe63c433 100644 --- a/go/parquet/internal/encoding/encoder.go +++ b/go/parquet/internal/encoding/encoder.go @@ -244,7 +244,7 @@ func (d *dictEncoder) FlushValues() (Buffer, error) { // EstimatedDataEncodedSize returns the maximum number of bytes needed to store the RLE encoded indexes, not including the // dictionary index in the computation. func (d *dictEncoder) EstimatedDataEncodedSize() int64 { - return 1 + int64(utils.MaxBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinBufferSize(d.BitWidth())) + return 1 + int64(utils.MaxRLEBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinRLEBufferSize(d.BitWidth())) } // NumEntries returns the number of entires in the dictionary index for this encoder. diff --git a/go/parquet/internal/encoding/encoding_test.go b/go/parquet/internal/encoding/encoding_test.go index 50e72de004e19..b0d86321e09f1 100644 --- a/go/parquet/internal/encoding/encoding_test.go +++ b/go/parquet/internal/encoding/encoding_test.go @@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() { b.checkRoundTrip(parquet.Encodings.Plain) } +func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() { + switch b.typ { + case reflect.TypeOf(true): + b.initData(2000, 200) + b.checkRoundTrip(parquet.Encodings.RLE) + default: + b.T().SkipNow() + } +} + func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() { b.initData(10000, 1) @@ -408,6 +418,8 @@ func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() { if validBits != nil { b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset) switch b.typ { + case reflect.TypeOf(false): + b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset) case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)): b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset) case reflect.TypeOf(parquet.ByteArray{}): diff --git a/go/parquet/internal/encoding/levels.go b/go/parquet/internal/encoding/levels.go index c5622519b0ce4..e04ec19d5473d 100644 --- a/go/parquet/internal/encoding/levels.go +++ b/go/parquet/internal/encoding/levels.go @@ -48,7 +48,7 @@ func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffer nbytes := 0 switch encoding { case parquet.Encodings.RLE: - nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + utils.MinBufferSize(bitWidth) + nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) + utils.MinRLEBufferSize(bitWidth) case parquet.Encodings.BitPacked: nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth))) default: diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go b/go/parquet/internal/encoding/typed_encoder.gen.go index 7c1f954632f06..411e87c17e7d5 100644 --- a/go/parquet/internal/encoding/typed_encoder.gen.go +++ b/go/parquet/internal/encoding/typed_encoder.gen.go @@ -1225,6 +1225,8 @@ func (boolEncoderTraits) Encoder(e format.Encoding, useDict bool, descr *schema. switch e { case format.Encoding_PLAIN: return &PlainBooleanEncoder{encoder: newEncoderBase(e, descr, mem)} + case format.Encoding_RLE: + return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)} default: panic("unimplemented encoding type") } @@ -1248,6 +1250,8 @@ func (boolDecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, useDi switch e { case parquet.Encodings.Plain: return &PlainBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)} + case parquet.Encodings.RLE: + return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding type") } diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl index f0d4fb50ae2fc..69415ccca4a26 100644 --- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl +++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl @@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s switch e { case format.Encoding_PLAIN: return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)} +{{- if eq .Name "Boolean" }} + case format.Encoding_RLE: + return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)} +{{- end}} {{- if or (eq .Name "Int32") (eq .Name "Int64")}} case format.Encoding_DELTA_BINARY_PACKED: return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{ @@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, switch e { case parquet.Encodings.Plain: return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)} +{{- if eq .Name "Boolean" }} + case parquet.Encodings.RLE: + return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)} +{{- end}} {{- if or (eq .Name "Int32") (eq .Name "Int64")}} case parquet.Encodings.DeltaBinaryPacked: if mem == nil { diff --git a/go/parquet/internal/testutils/random.go b/go/parquet/internal/testutils/random.go index 2c8a2809dc784..d9a06da43ba4e 100644 --- a/go/parquet/internal/testutils/random.go +++ b/go/parquet/internal/testutils/random.go @@ -438,15 +438,16 @@ func fillRandomIsValid(seed uint64, pctNull float64, out []bool) { // If the type is parquet.ByteArray or parquet.FixedLenByteArray, heap must not be null. // // The default values are: -// []bool uses the current time as the seed with only values of 1 being false, for use -// of creating validity boolean slices. -// all other types use 0 as the seed -// a []parquet.ByteArray is populated with lengths between 2 and 12 -// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12. +// +// []bool uses the current time as the seed with only values of 1 being false, for use +// of creating validity boolean slices. +// all other types use 0 as the seed +// a []parquet.ByteArray is populated with lengths between 2 and 12 +// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12. func InitValues(values interface{}, heap *memory.Buffer) { switch arr := values.(type) { case []bool: - fillRandomIsValid(uint64(time.Now().Unix()), 1.0, arr) + fillRandomIsValid(uint64(time.Now().Unix()), 0.5, arr) case []int32: FillRandomInt32(0, arr) case []int64: diff --git a/go/parquet/internal/utils/bit_reader_test.go b/go/parquet/internal/utils/bit_reader_test.go index 317cc4960afe2..c285f5165cc0a 100644 --- a/go/parquet/internal/utils/bit_reader_test.go +++ b/go/parquet/internal/utils/bit_reader_test.go @@ -494,7 +494,7 @@ func (r *RLERandomSuite) checkRoundTrip(vals []uint64, width int) bool { func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) { nvalues := vals.Len() - bufsize := utils.MaxBufferSize(width, nvalues) + bufsize := utils.MaxRLEBufferSize(width, nvalues) buffer := make([]byte, bufsize) encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width) diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go index 866d7c61b4099..fef322c6fd560 100644 --- a/go/parquet/internal/utils/rle.go +++ b/go/parquet/internal/utils/rle.go @@ -37,13 +37,13 @@ const ( MaxValuesPerLiteralRun = (1 << 6) * 8 ) -func MinBufferSize(bitWidth int) int { +func MinRLEBufferSize(bitWidth int) int { maxLiteralRunSize := 1 + bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth)) maxRepeatedRunSize := binary.MaxVarintLen32 + bitutil.BytesForBits(int64(bitWidth)) return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize)) } -func MaxBufferSize(width, numValues int) int { +func MaxRLEBufferSize(width, numValues int) int { bytesPerRun := width numRuns := int(bitutil.BytesForBits(int64(numValues))) literalMaxSize := numRuns + (numRuns * bytesPerRun)