Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the creation of Subjects explicit in topic producing #721

Open
eliax1996 opened this issue Sep 20, 2023 · 0 comments
Open

make the creation of Subjects explicit in topic producing #721

eliax1996 opened this issue Sep 20, 2023 · 0 comments

Comments

@eliax1996
Copy link
Contributor

Currently if we do a POST request towards /topics/topic_name with only the key_schema, value_schema and the records in the body of the request we implicitly create a Subject associated to the topic.
I think doing this implicitly without a flag is dangerous and can lead users to create new Subject even if they don't realize it.
It also creates different behaviour in the request based on the absence/presence of fields.
An example of that was related to the issue #720.

If we try to produce by specifying the id's of the key and the value we get back an error because the subject do not have the -key and the -value postfix.

Otherwise, if we get rid of the ids in the request and we include the schema, the schema registry proceed automatically with creating the new subjects giving the false impression that the previous registration of the Subject was responsible for the message produced.

Code to replicate the issue:

async def test_another_avro_publish(
    rest_async_client: Client,
    registry_async_client: Client,
    admin_client: KafkaRestAdminClient,
):
    topic = new_topic(admin_client)
    other_tn = new_topic(admin_client)

    await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
    header = REST_HEADERS["avro"]

    tested_avro_schema = {
        "type": "record",
        "name": "example",
        "namespace": "example",
        "doc": "example",
        "fields": [
            {
                "type": "int",
                "name": "test",
                "doc": "my test number",
                "namespace": "test",
                "default": "5"
            }
        ]
    }

    schema_str = json.dumps(tested_avro_schema)

    res = await registry_async_client.post(f"subjects/{topic}/versions", json={
        "schema": schema_str,
        "schemaType": "AVRO"
    })
    assert res.ok

    key_schema_id = value_schema_id = res.json()["id"]
    key_body = {
        "test": 5
    }

    value_body = {
        "test": 5
    }

    body = {
        "key_schema": schema_str,
        "value_schema": schema_str,
        "records": [
            {
                "key": key_body,
                "value": value_body
            }
        ]
    }

    url = f"/topics/{topic}"
    res = await rest_async_client.post(url, json=body, headers=header)
    assert res.ok

This works fine (by implicitly creating two new subjects), instead if we keep the same code but we replace the body with the following one, it give us the error that the subjects aren't registered (this works the same even if we get rid of the key_schema and the value_schema):

body = {
        "key_schema_id": key_schema_id,
        "key_schema": schema_str,
        "value_schema_id": value_schema_id, # topic -> subject -> schema
        "value_schema": schema_str,
        "records": [
            {
                "key": key_body,
                "value": value_body
            }
        ]
    }

The correct way of registering the subjects and deal with it it's the following:

async def test_another_avro_publish(
    rest_async_client: Client,
    registry_async_client: Client,
    admin_client: KafkaRestAdminClient,
):
    topic = new_topic(admin_client)
    other_tn = new_topic(admin_client)

    await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
    header = REST_HEADERS["avro"]

    tested_avro_schema = {
        "type": "record",
        "name": "example",
        "namespace": "example",
        "doc": "example",
        "fields": [
            {
                "type": "int",
                "name": "test",
                "doc": "my test number",
                "namespace": "test",
                "default": "5"
            }
        ]
    }

    schema_str = json.dumps(tested_avro_schema)

    # check succeeds with 1 record and brand new schema]
    res = await registry_async_client.post(f"subjects/{topic}-key/versions", json={
        "schema": schema_str,
        "schemaType": "AVRO"
    })
    assert res.ok

    key_schema_id = res.json()["id"]

    res = await registry_async_client.post(f"subjects/{topic}-value/versions", json={
        "schema": schema_str,
        "schemaType": "AVRO"
    })
    assert res.ok

    value_schema_id = res.json()["id"]

    key_body = {
        "test": 5
    }

    value_body = {
        "test": 5
    }

    body = {
        "key_schema_id": key_schema_id,
        "value_schema_id": value_schema_id,
        "records": [
            {
                "key": key_body,
                "value": value_body
            }
        ]
    }

    url = f"/topics/{topic}"
    res = await rest_async_client.post(url, json=body, headers=header)
    assert res.ok

That works fine in both ways, if we include the ids or if we include the schemas.

I suggest to unify the behaviour by explicitly adding a field when we want an upsert semantics for the Subjects

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant