Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline processor/feature to ingest document in Opensearch's time based indices based on time field of incoming record #4832

Open
akshay0709 opened this issue Aug 14, 2024 · 2 comments
Assignees
Labels
documentation Improvements or additions to documentation

Comments

@akshay0709
Copy link

akshay0709 commented Aug 14, 2024

Is your feature request related to a problem? Please describe.

We would like to see a feature in data-prepper pipeline and eventually in the opensearch ingestion to ingest documents to a specific time based index based on the timestamp field from an incoming record.

For example, lets say you have following 2 records coming from source:

{
    "message": "hello",
    "created_at": "2024-08-13",
    "type": "greeting"
}
{
    "message": "how are you",
    "created_at": "2024-08-14",
    "type": "greeting"
}

and you have the following sink

sink:
        - opensearch:
            hosts: [ "https://your-domain" ]
            aws:
                sts_role_arn: "arn:aws:iam::<acc_no>:role/test-role"
                region: "us-east-1"
                serverless: false
            index: "testindex-%{yyyy-MM-dd}"
            document_id_field: "id"

Irrespective of when the above records are consumed by the sink, it should index the docs in the index based on the timestamp of the created_at field.

Therefore the first record would be indexed into testindex-2024-08-13 and the second record should be indexed into testindex-2024-08-14

Why is this feature important?

For the best case indexing realtime data, this feature would not be required but systems sometimes fail. Consider a scenario when pipeline is writing to testindex-2024-08-13 on 13th of August and Opensearch cluster failed at 11:30 PM, the pipeline sink will be backed up on events or pipeline needs to be stopped. Opensearch cluster came back online at 2:00 AM on 14th August after performing some manual ops. Upon resuming the pipeline, technically, the records between 11:30 - 11:59 should be indexed in testindex-2024-08-13 and remaining in the testindex-2024-08-14. Therefore the pipeline will require some sort of inbuilt intelligence to read timestamps from a field of the incoming record from source and perform _bulk request to that index.

OR

If you have to replay last 7 days of data from Kafka expecting 7 different daily indices.

Majority of the times ISM policies would be used to enforce the retention (delete state) on the data and if the data is indexed into incorrect index, we would be keeping the data longer than expected.

Describe the solution you'd like

Some type of processor or field in sink similar to document_id field that would identify (or create) the index name based on the timestamp from the incoming record and index the doc into that specific index.
In simple terms, when creating the bulk object, the logic should be able to determine the index name based on the timestamp from the field of the incoming record.

Describe alternatives you've considered (Optional)
Tried out the date processor but it does not help, it does add a timestamp to the doc but does not sort it to the index based on timestamp.

Also, trying out the date name index processor from the ingest pipelines but haven't had a success yet. Maybe it will work maybe it won't but having the requested feature in data-prepper will definitely make it easier to configure something like this at a single place.

Note: If someone has already figured this out, pointers would be helpful.

@muthup
Copy link

muthup commented Aug 15, 2024

Please refer to the documentation here at https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/opensearch/#usage under section configuration options > index. you would need something like ${/created_at}.. thanks to @kkondaka

@akshay0709
Copy link
Author

@muthup @kkondaka Thank you for the pointers. We were able to achieve this by combining date processor + index name based on formatted date values. Also updated the readme for opensearch plugin with example here: #4864

@dlvenable dlvenable added documentation Improvements or additions to documentation and removed untriaged labels Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

3 participants