Skip to content

Commit

Permalink
Merge pull request #2278 from lsst-sqre/tickets/DM-39664
Browse files Browse the repository at this point in the history
[DM-39664] Deploy MirrorMaker 2 for replicating EFD data at the Summit (Yagan) to Base (Manke)
  • Loading branch information
afausti authored Jul 4, 2023
2 parents f77bd03 + 0beafa8 commit fb5898c
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 29 deletions.
5 changes: 5 additions & 0 deletions applications/sasquatch/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ dependencies:
version: 2.1.1
repository: https://helm.influxdata.com/
- name: kafka-connect-manager
alias: kafka-connect-manager
condition: kafka-connect-manager.enabled
version: 1.0.0
- name: kafka-connect-manager
alias: source-kafka-connect-manager
condition: source-kafka-connect-manager.enabled
version: 1.0.0
- name: chronograf
condition: chronograf.enabled
version: 1.2.5
Expand Down
1 change: 1 addition & 0 deletions applications/sasquatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Rubin Observatory's telemetry service.
| kapacitor.resources.requests.cpu | int | `1` | |
| kapacitor.resources.requests.memory | string | `"1Gi"` | |
| rest-proxy | object | `{"enabled":false}` | Override rest-proxy configuration. |
| source-kafka-connect-manager | object | `{"enabled":false,"env":{"kafkaConnectUrl":"http://sasquatch-source-connect-api.sasquatch:8083"}}` | Override source-kafka-connect-manager configuration. |
| squareEvents.enabled | bool | `false` | Enable the Square Events subchart with topic and user configurations. |
| strimzi-kafka | object | `{}` | Override strimzi-kafka configuration. |
| strimzi-registry-operator | object | `{"clusterName":"sasquatch","clusterNamespace":"sasquatch","operatorNamespace":"sasquatch"}` | strimzi-registry-operator configuration. |
Expand Down
7 changes: 4 additions & 3 deletions applications/sasquatch/charts/kafka-connect-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ A subchart to deploy the Kafka connectors used by Sasquatch.
| env.kafkaUsername | string | `"kafka-connect-manager"` | Username for SASL authentication. |
| image.pullPolicy | string | `"IfNotPresent"` | |
| image.repository | string | `"ghcr.io/lsst-sqre/kafkaconnect"` | |
| image.tag | string | `"1.2.0"` | |
| image.tag | string | `"1.3.1"` | |
| influxdbSink.autoUpdate | bool | `true` | If autoUpdate is enabled, check for new kafka topics. |
| influxdbSink.checkInterval | string | `"15000"` | The interval, in milliseconds, to check for new topics and update the connector. |
| influxdbSink.connectInfluxDb | string | `"efd"` | InfluxDB database to write to. |
Expand All @@ -21,11 +21,12 @@ A subchart to deploy the Kafka connectors used by Sasquatch.
| influxdbSink.connectInfluxRetryInterval | string | `"60000"` | The interval, in milliseconds, between retries. Only valid when the connectInfluxErrorPolicy is set to `RETRY`. |
| influxdbSink.connectInfluxUrl | string | `"http://sasquatch-influxdb.sasquatch:8086"` | InfluxDB URL. |
| influxdbSink.connectProgressEnabled | bool | `false` | Enables the output for how many records have been processed. |
| influxdbSink.connectors | object | `{"test":{"enabled":false,"repairerConnector":false,"tags":"","topicsRegex":".*Test"}}` | Connector instances to deploy. |
| influxdbSink.connectors | object | `{"test":{"enabled":false,"removePrefix":"source.","repairerConnector":false,"tags":"","topicsRegex":"source.lsst.sal.Test"}}` | Connector instances to deploy. |
| influxdbSink.connectors.test.enabled | bool | `false` | Whether this connector instance is deployed. |
| influxdbSink.connectors.test.removePrefix | string | `"source."` | Remove prefix from topic name. |
| influxdbSink.connectors.test.repairerConnector | bool | `false` | Whether to deploy a repairer connector in addition to the original connector instance. |
| influxdbSink.connectors.test.tags | string | `""` | Fields in the Avro payload that are treated as InfluxDB tags. |
| influxdbSink.connectors.test.topicsRegex | string | `".*Test"` | Regex to select topics from Kafka. |
| influxdbSink.connectors.test.topicsRegex | string | `"source.lsst.sal.Test"` | Regex to select topics from Kafka. |
| influxdbSink.excludedTopicsRegex | string | `""` | Regex to exclude topics from the list of selected topics from Kafka. |
| influxdbSink.tasksMax | int | `1` | Maxium number of tasks to run the connector. |
| influxdbSink.timestamp | string | `"private_efdStamp"` | Timestamp field to be used as the InfluxDB time, if not specified use `sys_time()`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ spec:
- name: KAFKA_CONNECT_NAME
value: influxdb-sink-{{ $key }}
- name: KAFKA_CONNECT_INFLUXDB_URL
{{- if $value.connectInfluxUrl }}
value: {{ $value.connectInfluxUrl | quote }}
{{- else }}
value: {{ $.Values.influxdbSink.connectInfluxUrl | quote }}
{{- end }}
- name: KAFKA_CONNECT_DATABASE
{{- if $value.connectInfluxDb }}
value: {{ $value.connectInfluxDb | quote }}
Expand Down Expand Up @@ -75,6 +79,10 @@ spec:
- name: KAFKA_CONNECT_INFLUXDB_TAGS
value: {{ $value.tags | quote }}
{{- end }}
{{- if $value.removePrefix }}
- name: KAFKA_CONNECT_INFLUXDB_REMOVE_PREFIX
value: {{ $value.removePrefix | quote }}
{{- end }}
- name: KAFKA_CONNECT_ERROR_POLICY
value: {{ $.Values.influxdbSink.connectInfluxErrorPolicy | quote }}
- name: KAFKA_CONNECT_MAX_RETRIES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ enabled: true

image:
repository: ghcr.io/lsst-sqre/kafkaconnect
tag: 1.2.0
tag: 1.3.1
pullPolicy: IfNotPresent

influxdbSink:
Expand Down Expand Up @@ -40,9 +40,11 @@ influxdbSink:
# -- Whether to deploy a repairer connector in addition to the original connector instance.
repairerConnector: false
# -- Regex to select topics from Kafka.
topicsRegex: ".*Test"
topicsRegex: "source.lsst.sal.Test"
# -- Fields in the Avro payload that are treated as InfluxDB tags.
tags: ""
# -- Remove prefix from topic name.
removePrefix: "source."

# The s3Sink connector assumes Parquet format with Snappy compression
# and a time based partitioner.
Expand Down
5 changes: 5 additions & 0 deletions applications/sasquatch/charts/strimzi-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ A subchart to deploy Strimzi Kafka components for Sasquatch.
| kafka.tolerations | list | `[]` | Tolerations for Kafka broker pod assignment. |
| kafka.version | string | `"3.4.0"` | Version of Kafka to deploy. |
| mirrormaker2.enabled | bool | `false` | Enable replication in the target (passive) cluster. |
| mirrormaker2.replication.policy.class | string | IdentityReplicationPolicy | Replication policy. |
| mirrormaker2.replication.policy.separator | string | "" | Convention used to rename topics when the DefaultReplicationPolicy replication policy is used. Default is "" when the IdentityReplicationPolicy replication policy is used. |
| mirrormaker2.source.bootstrapServer | string | `""` | Source (active) cluster to replicate from. |
| mirrormaker2.source.topicsPattern | string | `"registry-schemas, lsst.sal.*"` | Topic replication from the source cluster defined as a comma-separated list or regular expression pattern. |
| mirrormaker2.sourceConnect.enabled | bool | `false` | Whether to deploy another Connect cluster for topics replicated from the source cluster. Requires the sourceRegistry enabled. |
| mirrormaker2.sourceRegistry.enabled | bool | `false` | Whether to deploy another Schema Registry for the schemas replicated from the source cluster. |
| mirrormaker2.sourceRegistry.schemaTopic | string | `"source.registry-schemas"` | Name of the topic Schema Registry topic replicated from the source cluster |
| registry.schemaTopic | string | `"registry-schemas"` | Name of the topic used by the Schema Registry |
| superusers | list | `["kafka-admin"]` | A list of usernames for users who should have global admin permissions. These users will be created, along with their credentials. |
| users.kafdrop.enabled | bool | `true` | Enable user Kafdrop (deployed by parent Sasquatch chart). |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ spec:
# The frequency to check for new topics.
refresh.topics.interval.seconds: 60
# Policy to define the remote topic naming convention.
# This setting will preserve topic names in the target cluster.
replication.policy.separator: ""
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
# The default is to preserve topic names in the target cluster.
# To add the source cluster alias as a prefix to the topic name, use replication.policy.separator="." and replication.policy.class="org.apache.kafka.connect.mirror.DefaultReplicationPolicy"
replication.policy.separator: {{ default "" .Values.mirrormaker2.replication.policy.separator }}
replication.policy.class: {{ default "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" .Values.mirrormaker2.replication.policy.class }}
# Handling high volumes of messages
# By increasing the batch size, produce requests are delayed and more messages are
# added to the batch and sent to brokers at the same time.
Expand All @@ -76,7 +77,6 @@ spec:
# Increase request timeout
producer.request.timeout.ms: 120000
consumer.request.timeout.ms: 120000

heartbeatConnector:
config:
heartbeats.topic.replication.factor: 3
Expand All @@ -91,9 +91,7 @@ spec:
sync.group.offsets.interval.seconds: 60
# The frequency of checks for offset tracking.
emit.checkpoints.interval.seconds: 60
# Policy to define the remote topic naming convention.
# This setting will preserve topic names in the target cluster.
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
replication.policy.class: {{ default "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" .Values.mirrormaker2.replication.policy.class }}
# Topic replication from the source cluster defined as a comma-separated list
# or regular expression pattern.
topicsPattern: {{ .Values.mirrormaker2.source.topicsPattern }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{{- if .Values.mirrormaker2.sourceConnect.enabled }}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: {{ .Values.cluster.name }}-source
annotations:
# Use Connect REST API to configure connectors
strimzi.io/use-connector-resources: "false"
spec:
image: {{ .Values.connect.image | quote }}
replicas: {{ .Values.connect.replicas }}
bootstrapServers: {{ .Values.cluster.name }}-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: {{ .Values.cluster.name }}-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: {{ .Values.cluster.name }}-source-connect
certificate: user.crt
key: user.key
config:
group.id: {{ .Values.cluster.name }}-source-connect
offset.storage.topic: {{ .Values.cluster.name }}-source-connect-offsets
config.storage.topic: {{ .Values.cluster.name }}-source-connect-configs
status.storage.topic: {{ .Values.cluster.name }}-source-connect-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable: true
key.converter.schema.registry.url: http://sasquatch-source-schema-registry.sasquatch:8081
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable: true
value.converter.schema.registry.url: http://sasquatch-source-schema-registry.sasquatch:8081
request.timeout.ms: 120000
resources:
requests:
cpu: "2"
memory: 4Gi
limits:
cpu: "8"
memory: 24Gi
jvmOptions:
"-Xmx": "8g"
"-Xms": "8g"
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: {{ .Values.cluster.name }}-source-connect
labels:
strimzi.io/cluster: {{ .Values.cluster.name }}
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: group
name: {{ .Values.cluster.name }}-source-connect
operation: Read
- resource:
type: group
name: "*"
patternType: literal
operation: All
- resource:
type: topic
name: "*"
patternType: literal
type: allow
host: "*"
operation: All
quotas:
producerByteRate: 1073741824
consumerByteRate: 1073741824
requestPercentage: 90
controllerMutationRate: 1000
{{- end }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{{- if .Values.mirrormaker2.sourceRegistry.enabled }}
---
apiVersion: roundtable.lsst.codes/v1beta1
kind: StrimziSchemaRegistry
metadata:
name: {{ .Values.cluster.name }}-source-schema-registry
spec:
listener: tls
compatibilityLevel: none
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: {{ .Values.cluster.name }}-source-schema-registry
labels:
strimzi.io/cluster: {{ .Values.cluster.name }}
spec:
authentication:
type: tls
authorization:
# Official docs on authorizations required for the Schema Registry:
# https://docs.confluent.io/current/schema-registry/security/index.html#authorizing-access-to-the-schemas-topic
type: simple
acls:
# Allow Read, Write and DescribeConfigs operations on the
# schemas topic
- resource:
type: topic
name: {{ .Values.mirrormaker2.sourceRegistry.schemaTopic }}
patternType: literal
operation: Read
type: allow
- resource:
type: topic
name: {{ .Values.mirrormaker2.sourceRegistry.schemaTopic }}
patternType: literal
operation: Write
type: allow
- resource:
type: topic
name: {{ .Values.mirrormaker2.sourceRegistry.schemaTopic }}
patternType: literal
operation: DescribeConfigs
type: allow
# Allow all operations on the schema-registry* group
- resource:
type: group
name: schema-registry
patternType: prefix
operation: All
type: allow
# Allow Describe on the __consumer_offsets topic
# (The official docs also mention DescribeConfigs?)
- resource:
type: topic
name: "__consumer_offsets"
patternType: literal
operation: Describe
type: allow
{{- end }}
17 changes: 17 additions & 0 deletions applications/sasquatch/charts/strimzi-kafka/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,20 @@ mirrormaker2:
bootstrapServer: ""
# -- Topic replication from the source cluster defined as a comma-separated list or regular expression pattern.
topicsPattern: "registry-schemas, lsst.sal.*"
replication:
policy:
# -- Convention used to rename topics when the DefaultReplicationPolicy replication policy is used. Default is "" when the IdentityReplicationPolicy replication policy is used.
# @default -- ""
separator: "."
# -- Replication policy.
# @default -- IdentityReplicationPolicy
class: "org.apache.kafka.connect.mirror.DefaultReplicationPolicy"
sourceRegistry:
# -- Whether to deploy another Schema Registry for the schemas replicated from the source cluster.
enabled: false
# -- Name of the topic Schema Registry topic replicated from the source cluster
schemaTopic: "source.registry-schemas"
sourceConnect:
# -- Whether to deploy another Connect cluster for topics replicated from the source cluster.
# Requires the sourceRegistry enabled.
enabled: false
Loading

0 comments on commit fb5898c

Please sign in to comment.