Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: influx inspect export parquet #25047

Open
wants to merge 16 commits into
base: master-1.x
Choose a base branch
from

Conversation

alespour
Copy link
Contributor

@alespour alespour commented Jun 7, 2024

Closes https://github.com/influxdata/edge/issues/672

This PR extends influx_inspect command export with Parquet output. The code in cmd/influx_inspect/export/parquet folder is Parquet exporter code and related code ported from idpe (command snapshot store tsm export), just the minimal required subset for the export.

New influx_inspect options:

  • -measurement - selects measurement to be exported required
  • -parquet - selects Parquet output instead of line protocol
  • -chunk-size - size, in bytes, to partition Parquet files (default 100000000)

Output file(s) are created in a folder specified via existing -out option. The limitations are:

  • -database, -retention and -measurement must be specified
  • only TSM files are exported (not WAL files, unlike when exporting to line protocol) - if requested, can be easily implemented
  • export to Parquet file(s) is done per each TSM file. The files are apparently not sorted by time of the contained data by the reading code. Therefore, neither are output files. So for example table-00001.parquet may contain older data than table-00000.parquet. Seems irrelevant for import.

  • I've read the contributing section of the project README.

@alespour alespour marked this pull request as ready for review June 11, 2024 08:34
@alespour alespour marked this pull request as draft June 12, 2024 09:04
@alespour
Copy link
Contributor Author

alespour commented Jun 12, 2024

Export example cmd:

influx_inspect export -datadir /var/lib/influxdb/data/ -waldir /var/lib/influxdb/wal/ -out /bigdata/export/ -database benchmark_db -retention autogen -measurement cpu -parquet

Import via telegraf:

[[inputs.file]]
   files = ["/bigdata/export/table-*.parquet"]
   name_override = "cpu"
   data_format = "parquet"
   tag_columns = ["datacenter","hostname","os","rack","region","service","team"]
   timestamp_column = "time"
   timestamp_format = "unix_ns"

telegraf --once

@alespour alespour marked this pull request as ready for review June 12, 2024 09:41
@powersj
Copy link
Contributor

powersj commented Jun 12, 2024

@davidby-influx could we get Stuart's review on this PR? While not urgent, it would be nice to keep up the momentum on this.

@alespour I have two comments:

  • In the README I'd rather see some examples of running with this new option + the required params
  • Is there a reason you are using arrow v14 and not v16? I assume that is copied over as well?

@alespour
Copy link
Contributor Author

alespour commented Jun 12, 2024

@powersj Yes, arrow v14 is used used in v2 exporter and it was just copied. I'll update the dep to v16. And add some examples of running the tool with Parquet output.

Copy link
Contributor

@davidby-influx davidby-influx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with Parquet, so I leave the correctness of the algorithms to other reviewers like @stuartcarnie. This review focuses on code robustness and debugability when errors occur from things like bad input.

cmd/influx_inspect/export/export.go Outdated Show resolved Hide resolved
cmd/influx_inspect/export/export.go Outdated Show resolved Hide resolved
// since code from v2 exporter is used, we need v2 series key
seriesKeyEx := append([]byte(models.MeasurementTagKey+"="), []byte(key)...)
// get tags
tags := models.ParseTags(models.Escaped{B: seriesKeyEx})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there the possibility of an error if tags comes back zero-length or nil? Does that generate legal parquet?

Not a request for change, just a question.

cmd/influx_inspect/export/export_parquet.go Outdated Show resolved Hide resolved
cmd/influx_inspect/export/export_parquet.go Outdated Show resolved Hide resolved
cmd/influx_inspect/export/parquet/models/points.go Outdated Show resolved Hide resolved
cmd/influx_inspect/export/parquet/models/points.go Outdated Show resolved Hide resolved
Copy link
Contributor

@stuartcarnie stuartcarnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in the comments, I have identified a few issues that must be addressed, as the data is not exporting correctly.


The following is just some observations, to document my understanding of the implementation, including some potential downsides. I don't know the goals of the Parquet export, so the downsides may not be a concern. @powersj and @garylfowler this is for your reference too.

Parquet file output

For each measurement, this implementation generates 1 parquet file for each TSM file, and once addressed, it will also generate a parquet file for each shard that has a WAL with data for the measurement.

For example, say there are 10 shards, and each shard contains 10 TSM files, and the user exports 10 measurements, the result may be up to 1,100 parquet files1.

Note

It is expected that each measurement must be in its own Parquet file, as the schema across measurements is potentially different.

Potential pitfalls

A disadvantage of this approach is that the schema for each parquet is determined by a single TSM file. This becomes an issue if the schema for a measurement varies across TSM files. Within a single shard, the field types cannot change, but the tag set may vary.

For example, if a user writes some data, such as:

m0,tag00=val00,tag01=val00 fieldF=1.3
m0,tag00=val01,tag01=val00 fieldF=1.3
m0,tag00=val00,tag01=val01 fieldF=1.3

The columns looks like this:

tag00,tag01,fieldF

If a later TSM file contains writes such as:

m0,tag00=val00,tag01=val00,tag02=val00 fieldF=1.3
m0,tag00=val00,tag01=val00,tag02=val00 fieldF=1.3
m0,tag00=val00,tag01=val00,tag02=val00 fieldF=1.3

The schema now looks like this:

tag00,tag01,tag02,fieldF

In practice, that might mean that querying all the tables using tools like duckdb are more difficult, as the schema varies across files. I verified this, and whilst the data was still queryable, duckdb dropped columns that didn't exist in all parquet files.


Note

The source implementation from Cloud 2 contains logic to merge multiple TSM files together and to generate a merged schema. If that approach is desired, then the current 1.x implementation will need to be refactored to use a streaming approach, where TSM data is read and merged from disk, and written in chunks to Parquet, much like the Cloud 2 implementation.

Footnotes

  1. Assuming each TSM file contains all measurements, and every shard contains a WAL with all measurements.

Comment on lines +93 to +95
if err := cmd.writeToParquet(newResultSet(vc), schema); err != nil {
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exporting a measurement that does not exist causes this line to panic.


var schema *index.MeasurementSchema

for key, fields := range vc {
Copy link
Contributor

@stuartcarnie stuartcarnie Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop does not produce a valid schema, if the user inserts data with mixed tag sets.

Using influx to create some test data:

> create database mixed_schema
> use mixed_schema
Using database mixed_schema
> insert test,tag0=val0 fieldF=1.2
> insert test,tag0=val0,tag1=val0 fieldF=3.1

And then running the command to export some data for the test measurement1. We can view the data using duckdb:

duckdb -box -s "from 'parquet/*.parquet' select *"
time                        tag0  fieldF
--------------------------  ----  ------
2024-07-24 02:57:41.285306        1.2
2024-07-24 02:57:45.308777        3.1

Note

The tag1 column is missing.

We'll address the missing data in the tag0 column separately.

To produce the correct schema, all series keys must be enumerated and the tag keys merged. An example implementation is the KeyMerger type:

// tagsKeyMerger is responsible for determining a merged set of tag keys
type KeyMerger struct {
i int
tmp [][]byte
keys [2][][]byte
}

Footnotes

  1. ranging over a map produces keys in a non-deterministic order, so you may need to run the export multiple times to achieve the result above

Comment on lines +405 to +407
if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] {
continue
}
Copy link
Contributor

@stuartcarnie stuartcarnie Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Measurements can have escaped , values, so it is not possible to export all measurements with this function. Example:

> create database measurement_with_comma
> insert cols\,bad,tag0=tag0_0,tag1=tag1_0 fieldF=3.2
> insert cols\,bad,tag0=tag0_1,tag1=tag1_0 fieldF=1.2
> select * from "cols,bad"
name: cols,bad
time                fieldF tag0   tag1
----                ------ ----   ----
1721793491299269000 3.2    tag0_0 tag1_0
1721793494708317000 1.2    tag0_1 tag1_0

Use models.ParseName to extract the measurement correctly.

Comment on lines +478 to +481
if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] {
continue
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted previously, measurements can have escaped , values, so it is not possible to export with this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data in WAL files is not exported to parquet, as there is no call to exportDone.

Comment on lines +337 to +338
_, t.buf.tags = models.ParseKeyBytesWithTags(groupKey, t.buf.tags)
tagSet := t.buf.tags[1:] // skip measurement, which is always the first tag key (\x00)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in the first tag key column dropping all its data, as this code assumes a V2 series key, which it is not.

For example, using the following data:

> drop measurement cols
> insert cols,tag0=tag0_0,tag1=tag1_0 fieldF=3.2
> insert cols,tag0=tag0_0,tag1=tag1_1 fieldF=1.2
> insert cols,tag0=tag0_1,tag1=tag1_0 fieldF=1.3
> insert cols,tag0=tag0_2,tag1=tag1_1 fieldF=1.3
> insert cols,tag0=tag0_2,tag1=tag1_2 fieldF=4.3

The exported parquet is missing all data for column tag0:

duckdb -column -s "from 'parquet/*.parquet' select *"
time                        tag0  tag1    fieldF
--------------------------  ----  ------  ------
2024-07-24 06:04:38.273453        tag1_2  4.3
2024-07-24 06:04:07.819015        tag1_0  3.2
2024-07-24 06:04:13.952354        tag1_1  1.2
2024-07-24 06:04:21.444498        tag1_0  1.3
2024-07-24 06:04:34.224291        tag1_1  1.3

As noted earlier, given this code comes from V2, it assumes it was parsed from a V2 TSM series keys. Structurally, the series keys are the same, taking the form:

<measurement>[,tag0=val,...]#!~#fieldkey

Semantically, they are different, as V2 series keys are always:

orgid+bucketid,\x00=<measurement>[,tag0=val,...]\xff=<fieldkey>#!~#fieldkey

Note

Both are ordered the same when stored in a TSM file.


One approach for situations like this, is to introduce new types to encapsulate what type of TSM series key is represented by the []byte. For example the following struct:

type SeriesKeyV1 struct {
    B []byte
}

is a series key read from disk in InfluxDB v1.x, and may look like the following:

my_measurement,mytag=val#!~#field_key

Whereas:

type SeriesKeyV2 struct {
    B []byte
}

is a V2 style series key. Given there is no org / bucket ID in InfluxDB 1.x, a default measurement name could be assigned, such as NULL:

NULL,\x00=my_measurement,mytag=val\xff=field_key#!~#field_key

When these keys are passed around, they should never be passed as a raw []byte, but rather in their container struct.

Comment on lines +236 to +243
// resultSet implements resultset.ResultSet over exported TSM data
type resultSet struct {
x map[string]map[string][]tsm1.Value
keys []string
keyIndex int
fields []string
fieldIndex int
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resultSet implementation returns data non-deterministically. As a result, the resulting Parquet files will be different every time. Not a bug, per-se, but something to bear in mind.

)

//
// Export to Parquet file(s) is done per each TSM file. The files are apparently not sorted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data within a single TSM file is sorted in ascending order, by the series key, and the data with a single block is sorted by timestamp.

There is no guaranteed ordering of data over multiple TSM files. This may occur if a user is back filling data and writes the data in no specific order.

Copy link
Contributor

@stuartcarnie stuartcarnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment about how schema does vary within a single TSM file

@@ -87,7 +91,7 @@ func (cmd *Command) exportDoneParquet(_ string) error {
TagSet: tagSet,
FieldSet: fieldSet,
}
// schema does not change in a table
// schema does not change in a table in one tsm file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tag set schema can change within a single TSM file from one series key to the next.

If a user writes the following point:

m0,tag0=val0 f0=1.3

The schema is for the previous line is:

col type
tag0 string (tag)
f0 float (field)

If the next write is:

m0,tag1=val0,tag2=val1 f1=false

The schema for that line is:

col type
tag1 string (tag)
tag2 string (tag)
f1 bool (field)

Therefore, the schema must be the union of all series keys, resulting in a table schema of:

col type
tag0 string (tag)
tag1 string (tag)
tag2 string (tag)
f0 float (field)
f1 bool (field)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume then the export have to iterate over twice TSM files. In the first iteration, complete tables schema would be gathered, and in the seconds iteration the actual data exported, correct?

@alespour
Copy link
Contributor Author

alespour commented Jul 25, 2024

Thank you very much for your input, @stuartcarnie . Given the apparent need for better insight into TSM and exporter code itself, I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).

@stuartcarnie
Copy link
Contributor

I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).

That would be very inefficient for exporting large databases. If you have access to the code I wrote for Cloud 2, that could be made to work with OSS, and it should be very efficient.

@stuartcarnie
Copy link
Contributor

stuartcarnie commented Aug 2, 2024

@alespour I'd like to recommend you consider an alternate approach using some higher-level APIs, rather than a TSMReader.

I would study the influx_tools export command:

https://github.com/influxdata/influxdb/blob/cc26b7653c7d9c383b855c3765b5deb3ec803c51/cmd/influx_tools/export

and consider adding a new command to influx_tools, called export-parquet, unless the team has strong feelings otherwise.


At a high level, I suggest processing the export per shard. That will mean using the Shard type:

influxdb/tsdb/shard.go

Lines 161 to 162 in e484c4d

// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {

You'll use a combination of the CreateSeriesCursor API:

func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {

which is responsible for iterating over all the series keys of a shard. The series keys are produced in order.

This, in combination with the CreateCursorIterator API:

func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) {

Is used to produce data for each field.

You can see these APIs being used in the cmd/influx_tools/internal/storage package, starting with a call to the Read API:

// Read creates a ResultSet that reads all points with a timestamp ts, such that start ≤ ts < end.
func (s *Store) Read(ctx context.Context, req *ReadRequest) (*ResultSet, error) {

which returns a ResultSet:

type ResultSet struct {
cur seriesCursor
row seriesRow
ci CursorIterator
}

The existing code wraps the list of shards in a ShardGroup and so it obtains the list of fields as follows:

var itr query.Iterator
var fi query.FloatIterator
var opt = query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "key"}},
Authorizer: query.OpenAuthorizer,
Ascending: true,
Ordered: true,
}
if itr, err = sg.CreateIterator(ctx, &influxql.Measurement{SystemIterator: "_fieldKeys"}, opt); itr != nil && err == nil {
if fi, err = toFloatIterator(itr); err != nil {
goto CLEANUP
}
p.fields = extractFields(fi)
fi.Close()
return p, nil
}

As you will be working with a Shard (not a ShardGroup), you can obtain all the fields for the measurement you are processing via the MeasurementFields API directly, which takes the name of a measurement:

influxdb/tsdb/shard.go

Lines 853 to 854 in e484c4d

// MeasurementFields returns fields for a measurement.
func (s *Shard) MeasurementFields(name []byte) *MeasurementFields {

You then use the FieldKeys API to retrieve all the fields for the measurement, which importantly, is sorted, and you must maintain that sort order.

You can then get the type information for each field returned by the FieldKeys API using the Field API:

influxdb/tsdb/shard.go

Lines 1639 to 1640 in e484c4d

return false
}

You then treat the ResultSet as an iterator, calling the Next API to iterate over each series key and field:

// Next moves to the result set forward to the next series key.
func (r *ResultSet) Next() bool {

Note

As stated in previous comments, you will need to iterate over all the series keys first, to determine all the tag keys, to ensure the Parquet table schema is complete.

Ultimately, your goal is to replace:

// resultSet implements resultset.ResultSet over exported TSM data
type resultSet struct {
x map[string]map[string][]tsm1.Value
keys []string
keyIndex int
fields []string
fieldIndex int
}

with a version that consumes a Shard directly.

@bednar
Copy link
Contributor

bednar commented Aug 14, 2024

Just a quick update from us: We are on track to use influx_tools export as a base tool to convert TSM data into Parquet format. @alespour has successfully customized the tool to iterate over shards for data access during the export phase, and we are now exploring how to integrate influx_tools export with the existing code for the Parquet exporter.

@bednar
Copy link
Contributor

bednar commented Aug 21, 2024

Status update from Bonitoo: We've prepared a refactored version of the exporter based on influx_tools, detailed in PR #25253. We are currently waiting to feedback how to correctly create the series key in the exported file. For more information, please check out this Slack conversation: https://influxdata.slack.com/archives/C5BSZ026L/p1724142258571929?thread_ts=1721781280.449769&cid=C5BSZ026L

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants