Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#45 Implement arrow -> json writer #47

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft

#45 Implement arrow -> json writer #47

wants to merge 15 commits into from

Conversation

syucream
Copy link
Contributor

@syucream syucream commented Jul 14, 2020

#45

TODO

  • prototyping for PoC: (various inputs) -> map's -> arrow -> map's -> json -> parquet
  • remove parquet writing side Go intermediates: (various inputs) -> map's -> arrow -> json -> parquet
  • remove input side Go intermediates: (various inputs) -> arrow -> json -> parquet
  • performance tests
  • record type validations Validate record that matches with a schema at FormatToMap #27
  • reporting benchmark results
  • check if we have any tuning points
  • finalize this changes (some refactorings, tests, docs)

@syucream
Copy link
Contributor Author

Oh I missed existing arrjson might support arrow -> json conversion for this part arrow -> map's -> json
https://github.com/apache/arrow/tree/master/go/arrow/internal/arrjson

It's actually an internal package but reusable for this use case I guess.

@syucream
Copy link
Contributor Author

I finally examine a mem pprof result. It show a lower usage than the current version's ( #44 (comment) ) The reduction effect is 543.88MB -> 97.45MB, 18% sized! But I'm not sure if there's any other high memory consumer ...

$ ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 26, 2020 at 11:18pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 97.45MB, 100% of 97.45MB total
Showing top 10 nodes out of 47
      flat  flat%   sum%        cum   cum%
   96.08MB 98.60% 98.60%    96.08MB 98.60%  github.com/apache/arrow/go/arrow/memory.(*GoAllocator).Allocate (inline)
    1.37MB  1.40%   100%     1.37MB  1.40%  github.com/klauspost/compress/zstd.encoderOptions.encoder
         0     0%   100%    72.60MB 74.50%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Append
         0     0%   100%     8.59MB  8.81%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Reserve
         0     0%   100%     8.59MB  8.81%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Resize
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Append
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Reserve
         0     0%   100%     0.57MB  0.58%  github.com/apache/arrow/go/arrow/array.(*BooleanBuilder).Resize
         0     0%   100%     3.82MB  3.92%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewArray
         0     0%   100%     3.82MB  3.92%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewFloat32Array

@syucream
Copy link
Contributor Author

syucream commented Jul 26, 2020

To resolve #27, we need #49 ... 😭 . I created an another pullreq #50 to aim that.

@syucream
Copy link
Contributor Author

Here's a quick performance test.

I gave the below dummy input Avro file.

$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file examples/primitives.avsc --count 1000000 tmp.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
test.seed=1595854640329
$ ls -lh tmp.avro
-rw-r--r-- 1 ryo staff 80M  7 27 21:57 tmp.avro

With the current version(0.0.3):

$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType  41.95s user 1.05s system 132% cpu 32.354 total

With the latest(this pullreq) version:

$ time ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType  62.20s user 2.54s system 137% cpu 47.068 total

The elapsed time increased by 1.5x ... The latest version's result contains (inputs) -> map -> arrow additional conversion, so it's not so strange and we possibly reduce the time if we remove (inputs) -> map redundant conversion layer.

@syucream
Copy link
Contributor Author

record type validations #27

Finally supported! If we have this schema:

$ cat columnifier/testdata/schema/primitives.avsc
{
  "type": "record",
  "name": "Primitives",
  "fields" : [
    {"name": "boolean", "type": "boolean"},
    {"name": "int",     "type": "int"},
    {"name": "long",    "type": "long"},
    {"name": "float",   "type": "float"},
    {"name": "double",  "type": "double"},
    {"name": "bytes",   "type": "bytes"},
    {"name": "string",  "type": "string"}
  ]
}

And these record values. It partially matches field names and values but some values are null which's not allowed by the schema:

$ java -jar ~/tools/avro-tools-1.8.2.jar getschema columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{
  "type" : "record",
  "name" : "Nullables",
  "fields" : [ {
    "name" : "boolean",
    "type" : [ "null", "boolean" ],
    "default" : null
  }, {
    "name" : "int",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "long",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "float",
    "type" : [ "null", "float" ],
    "default" : null
  }, {
    "name" : "double",
    "type" : [ "null", "double" ],
    "default" : null
  }, {
    "name" : "bytes",
    "type" : [ "null", "bytes" ],
    "default" : null
  }, {
    "name" : "string",
    "type" : [ "null", "string" ],
    "default" : null
  } ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar tojson columnifier/testdata/record/nullables.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"boolean":null,"int":{"int":2049911329},"long":{"long":93517433735957},"float":null,"double":null,"bytes":null,"string":null}
{"boolean":null,"int":{"int":-1494730473},"long":{"long":-202289580949163},"float":null,"double":{"double":0.08069785324756118},"bytes":{"bytes":""},"string":{"string":"tpwmyxc"}}
{"boolean":{"boolean":true},"int":{"int":-1949023704},"long":{"long":516734426921889},"float":null,"double":{"double":0.6583549661805351},"bytes":null,"string":null}
{"boolean":{"boolean":false},"int":null,"long":{"long":-867000385846723},"float":{"float":0.13172472},"double":{"double":0.007504294905384068},"bytes":null,"string":null}
{"boolean":{"boolean":true},"int":null,"long":{"long":-163096126488560},"float":{"float":0.08742553},"double":{"double":0.5728205289212072},"bytes":{"bytes":"bytes5"},"string":null}
{"boolean":{"boolean":true},"int":null,"long":null,"float":null,"double":null,"bytes":{"bytes":"bytes6"},"string":{"string":"s"}}
{"boolean":{"boolean":false},"int":{"int":170755098},"long":{"long":714762663965379},"float":{"float":0.7437153},"double":null,"bytes":null,"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.22171424755307045},"bytes":null,"string":{"string":"uusutbymi"}}
{"boolean":{"boolean":true},"int":{"int":-433672812},"long":{"long":460231500089382},"float":{"float":0.43936086},"double":{"double":0.4923838260209136},"bytes":{"bytes":"bytes9"},"string":null}
{"boolean":null,"int":null,"long":null,"float":null,"double":{"double":0.24505978464315714},"bytes":null,"string":null}

Then columnify failed by the schema mismatch with this error message:

$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:16:38 Failed to write: unexpected input <nil> typed <nil> as bool: input record is unable to convert

btw the latest release version 0.0.3 throws very naive error messages:

$ ./columnify -schemaType avro -schemaFile columnifier/testdata/schema/primitives.avsc -recordType avro columnifier/testdata/record/nullables.avro > /dev/null
2020/07/28 00:17:24 Failed to write: reflect: call of reflect.Value.Type on zero Value

@syucream
Copy link
Contributor Author

The elapsed time increased by 1.5x

I will profile CPU usages next.

@codecov-commenter
Copy link

codecov-commenter commented Jul 28, 2020

Codecov Report

Merging #47 into master will decrease coverage by 11.69%.
The diff coverage is 39.08%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master      #47       +/-   ##
===========================================
- Coverage   70.05%   58.36%   -11.70%     
===========================================
  Files          19       18        -1     
  Lines         875     1237      +362     
===========================================
+ Hits          613      722      +109     
- Misses        203      462      +259     
+ Partials       59       53        -6     
Flag Coverage Δ
#unittests 58.36% <39.08%> (-11.70%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
parquet/discard.go 0.00% <0.00%> (ø)
parquet/stdio.go 80.00% <ø> (ø)
schema/avro.go 100.00% <ø> (ø)
schema/bigquery.go 100.00% <ø> (ø)
schema/parquet.go 73.56% <ø> (ø)
record/arrow.go 18.28% <4.48%> (-25.47%) ⬇️
record/avro.go 77.77% <57.14%> (+5.05%) ⬆️
record/jsonl.go 75.00% <66.66%> (+18.75%) ⬆️
arrow/json/writer.go 74.75% <74.75%> (ø)
columnifier/parquet.go 80.32% <75.00%> (-4.86%) ⬇️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f6d5b93...f0f8a32. Read the comment docs.

@syucream
Copy link
Contributor Author

I added benchmark and profilings into the CI job. The cpu profiling was here:

      flat  flat%   sum%        cum   cum%
     5.59s 14.32% 14.32%      9.55s 24.46%  runtime.scanobject
     2.20s  5.64% 19.95%      2.59s  6.63%  runtime.findObject
     2.07s  5.30% 25.26%      6.60s 16.91%  runtime.mallocgc
     1.73s  4.43% 29.69%      1.73s  4.43%  runtime.futex
     0.90s  2.31% 31.99%      1.05s  2.69%  runtime.heapBitsSetType
     0.82s  2.10% 34.09%      0.82s  2.10%  runtime.nextFreeFast (inline)
...

And it has some high cum% consumers:

...
     0.21s  0.54% 56.79%      7.63s 19.54%  github.com/xitongsys/parquet-go/marshal.MarshalJSON
...
     0.02s 0.051% 73.64%      6.47s 16.57%  github.com/xitongsys/parquet-go/writer.(*ParquetWriter).WriteStop
...
     0.04s   0.1% 70.62%     12.16s 31.15%  github.com/xitongsys/parquet-go/writer.(*ParquetWriter).flushObjs.func1
...
     0.04s   0.1% 71.13%     13.10s 33.56%  runtime.systemstack
...

It seems that we don't have so many tuning parts in our side now. So what we can do next is, I guess some parts related to modules, mainly parquet-go.

@syucream
Copy link
Contributor Author

@okkez can you check this changes if we can reduce mem usage? I replaced stupid intermediate data structure with Apache Arrow record so it possibly reduced. (ctx. #43

@okkez
Copy link
Contributor

okkez commented Jul 28, 2020

I cannot convert msgpack to parquet using columnify with this PR. So I don't measure memory usage.

$ ./columnify -recordType msgpack -schemaType avro -schemaFile ./rails_access_log.avsc rails-small.msgpack > /dev/null 
2020/07/29 08:36:09 Failed to write: unexpected input 200 typed string as int32: input record is unable to convert

But v0.0.3 can convert it.

I use the following schema and data.

{
  "name": "RailsAccessLog",
  "type": "record",
  "fields": [
    {
      "name": "container_id",
      "type": "string"
    },
    {
      "name": "container_name",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "log",
      "type": "string"
    },
    {
      "name": "__fluentd_address__",
      "type": "string"
    },
    {
      "name": "__fluentd_host__",
      "type": "string"
    },
    {
      "name": "action",
      "type": ["null", "string"]
    },
    {
      "name": "controller",
      "type": ["null", "string"]
    },
    {
      "name": "role",
      "type": "string"
    },
    {
      "name": "host",
      "type": "string"
    },
    {
      "name": "location",
      "type": ["null", "string"]
    },
    {
      "name": "severity",
      "type": ["null", "string"],
      "default": "INFO"
    },
    {
      "name": "status",
      "type": "int"
    },
    {
      "name": "db",
      "type": ["null", "float"]
    },
    {
      "name": "view",
      "type": ["null", "float"]
    },
    {
      "name": "duration",
      "type": ["null", "float"]
    },
    {
      "name": "method",
      "type": "string"
    },
    {
      "name": "path",
      "type": "string"
    },
    {
      "name": "format",
      "type": ["null", "string"]
    },
    {
      "name": "error",
      "type": ["null", "string"]
    },
    {
      "name": "remote_ip",
      "type": ["null", "string"]
    },
    {
      "name": "agent",
      "type": ["null", "string"]
    },
    {
      "name": "authenticated_user_id",
      "type": ["null", "string"]
    },
    {
      "name": "params",
      "type": ["null", "string"]
    },
    {
      "name": "tag",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    }
  ]
}

rails-small.msgpack.gz

I can convert mstpack to parquet after I replaced from int/float to string in the schema file.

@syucream
Copy link
Contributor Author

@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at f0f8a32 How about that?

@okkez
Copy link
Contributor

okkez commented Jul 30, 2020

@okkez I wonder why some values are encoded as string fields ... anyway I re-enabled to convert these values to int/float types at f0f8a32 How about that?

Works well. But memory usage is not reduced.
Converting 223MB msgpack to parquet format uses memory about 2.0GB (ps command RSS).

@syucream
Copy link
Contributor Author

syucream commented Aug 3, 2020

I could reproduce that, actually RSS is still so high (but I found that the memprofile result is not so terrible. It's curious). Anyway I would like to find another way to reduce it. Finally supporting streaming conversion ... ? That's not easy way but will be more effective.

@syucream
Copy link
Contributor Author

syucream commented Aug 6, 2020

@okkez I created an another patch with small changes separated from this pullreq which focuses on using Arrow. It supports stream-like IO and will reduce memory usafe. Can you see that?
#52

@syucream
Copy link
Contributor Author

syucream commented Aug 6, 2020

For using Arrow instead of naive map[string]interface{} intermediate representation ... no way to reduce memory usage effectively on the parquet-go. Finally we should re-implement parquet-go I think. 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants