Skip to content

Commit

Permalink
OKafka Sample (#121)
Browse files Browse the repository at this point in the history
Signed-off-by: Anders Swanson <[email protected]>
  • Loading branch information
anders-swanson authored Sep 17, 2024
1 parent 1badb09 commit f43815b
Show file tree
Hide file tree
Showing 17 changed files with 596 additions and 2 deletions.
17 changes: 17 additions & 0 deletions database/starters/oracle-spring-boot-starter-samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Oracle Database Spring Boot Samples

The Oracle Database Spring Boot Samples module provides a suite of comprehensive Spring Boot sample applications designed to enable developers with example code for application development.

### [Oracle UCP with JPA Sample](./oracle-spring-boot-sample-ucp-jpa/README.md)

The Oracle UCP with JPA sample application demonstrates how to use the Oracle Spring Boot Starter UCP with Spring Data JPA, connecting your Oracle Database with powerful ORM abstractions that facilitate rapid development, all while using the best connection pooling library for Oracle Database with Spring Boot.

### [JSON Relational Duality Views Sample](./oracle-spring-boot-sample-json-duality/README.md)

The JSON Relational Duality Views sample application demonstrates how to use the Oracle Spring Boot Starter JSON Collections with [JSON Relational Duality Views](https://docs.oracle.com/en/database/oracle/oracle-database/23/jsnvu/overview-json-relational-duality-views.html). JSON Relational Duality Views layer the advantages of JSON document-style database over existing relational data structures — Powerful JSON views with full CRUD capabilities can be created on relational database schemas, nesting related data into a single document with unified access.

### [OKafka Sample](./oracle-spring-boot-starter-okafka/README.md)

This sample application demonstrates how to use the Oracle Spring Boot Starter OKafka with the [Kafka Java Client for Oracle Transactional Event Queues](https://github.com/oracle/okafka)

Using an in-database message broker like TxEventQ eliminates the need for external message brokers, reduces overall network traffic, simplifying your overall application architecture — and the OKafka library enables developers to create applications for TxEventQ using familiar Kafka APIs for messaging.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Oracle Spring Boot Sample for JSON Relational Duality Views package includes

## Run the sample application

The sample application creates a temporary Oracle Free container database, and requires a docker runtime environment.
The sample application test uses Testcontainers, and creates a temporary Oracle Free container database, and requires a docker runtime environment.

To run the test application, run the following command:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Oracle Spring Boot Sample for OKafka

With Oracle Database 23ai, powerful Kafka APIs are easily used to read and write data backed by [Transactional Event Queues (TxEventQ)](https://docs.oracle.com/en/database/oracle/oracle-database/21/adque/aq-introduction.html).

If you’re unfamiliar with TxEventQ, it’s a robust, real-time message broker that runs within Oracle Database, designed for high throughput — TxEventQ can handle approximately [100 billion messages per day](https://www.oracle.com/database/advanced-queuing/) on an 8-node Oracle RAC cluster.

This sample application demonstrates how to use the Oracle Spring Boot Starter OKafka with the [Kafka Java Client for Oracle Transactional Event Queues](https://github.com/oracle/okafka)

The Spring Boot OKafka sample application includes the following components to demonstrate application development using Kafka APIs for Oracle Transactional Event Queues:

- Sample OKafka Producers and Consumers
- Connection properties for OKafka with Oracle Database
- Topic management using OKafka admin client
- Spring Boot configuration for OKafka
- A comprehensive test using Spring Boot that produces and consumes data from Transactional Event Queues using OKafka

## Run the sample application

The sample application test uses Testcontainers, and creates a temporary Oracle Free container database, and requires a docker runtime environment.

To run the test application, run the following command:

```shell
mvn test
```

## Configure your project to use OKafka

To use OKafka from your Spring Boot application, add the following Maven dependency to your project:

```xml
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
</dependency>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2024, Oracle and/or its affiliates. -->
<!-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>oracle-spring-boot-starter-samples</artifactId>
<groupId>com.oracle.database.spring</groupId>
<version>24.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>oracle-spring-boot-sample-okafka</artifactId>
<version>24.2.0</version>

<name>Oracle Spring Boot Starter - OKafka Sample</name>
<description>Oracle Spring Boot Starter Sample for OKafka</description>

<organization>
<name>Oracle America, Inc.</name>
<url>https://www.oracle.com</url>
</organization>

<developers>
<developer>
<name>Oracle</name>
<email>obaas_ww at oracle.com</email>
<organization>Oracle America, Inc.</organization>
<organizationUrl>https://www.oracle.com</organizationUrl>
</developer>
</developers>

<licenses>
<license>
<name>The Universal Permissive License (UPL), Version 1.0</name>
<url>https://oss.oracle.com/licenses/upl/</url>
<distribution>repo</distribution>
</license>
</licenses>

<scm>
<url>https://github.com/oracle/spring-cloud-oracle</url>
<connection>scm:git:https://github.com/oracle/spring-cloud-oracle.git</connection>
<developerConnection>scm:git:[email protected]:oracle/spring-cloud-oracle.git</developerConnection>
</scm>

<dependencies>
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- Test Dependencies-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot-dependencies.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-free</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.okafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Component;

import static com.oracle.database.spring.okafka.OKafkaConfiguration.TOPIC_NAME;

@Component
public class OKafkaComponent {
private final AsyncTaskExecutor taskExecutor;
private final Properties kafkaProperties;
private final SampleProducer<String> sampleProducer;
private final SampleConsumer<String> sampleConsumer;

// The tasks list is used to track and wait for consumer/producer execution.
private final List<Future<?>> tasks = new ArrayList<>();

public OKafkaComponent(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor taskExecutor,
@Qualifier("kafkaProperties") Properties kafkaProperties,
@Qualifier("sampleProducer") SampleProducer<String> sampleProducer,
@Qualifier("sampleConsumer") SampleConsumer<String> sampleConsumer) {
this.taskExecutor = taskExecutor;
this.kafkaProperties = kafkaProperties;
this.sampleProducer = sampleProducer;
this.sampleConsumer = sampleConsumer;
}

@PostConstruct
public void init() {
// Create a new OKafka topic
NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
OKafkaUtil.createTopicIfNotExists(kafkaProperties, topic);

// Start the producer and consumer
tasks.add(taskExecutor.submit(sampleProducer));
tasks.add(taskExecutor.submit(sampleConsumer));
}

public void await() throws ExecutionException, InterruptedException {
for (Future<?> task : tasks) {
task.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.okafka;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;
import java.util.stream.Stream;

import org.apache.kafka.clients.consumer.Consumer;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OKafkaConfiguration {
public static final String TOPIC_NAME = "OKAFKA_SAMPLE";

@Value("${ojdbc.path}")
private String ojdbcPath;

@Value("${bootstrap.servers}")
private String bootstrapServers;

// We use the default 23ai Free service name
@Value("${service.name:freepdb1}")
private String serviceName;

// We use plaintext for a containerized, local database.
// Use SSL for wallet connections, like Autonomous Database.
@Value("${security.protocol:PLAINTEXT}")
private String securityProtocol;

@Value("${producer.stream.file}")
private String producerFile;

@Value("${expected.messages:50}")
private int expectedMessages;


@Bean
@Qualifier("okafkaProperties")
public Properties kafkaProperties() {
return OKafkaUtil.getConnectionProperties(ojdbcPath,
bootstrapServers,
securityProtocol,
serviceName);
}

@Bean
@Qualifier("sampleProducer")
public SampleProducer<String> sampleProducer() throws IOException {
// Create the OKafka Producer.
Properties props = kafkaProperties();
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Note the use of the org.oracle.okafka.clients.producer.KafkaProducer class, for Oracle TxEventQ.
KafkaProducer<String, String> okafkaProducer = new KafkaProducer<>(props);
// We create a stream of a data from a file to give the producer input messages.
Stream<String> producerData = Files.lines(new File(producerFile).toPath());
return new SampleProducer<>(okafkaProducer, TOPIC_NAME, producerData);
}

@Bean
@Qualifier("sampleConsumer")
public SampleConsumer<String> sampleConsumer() {
// Create the OKafka Consumer.
Properties props = kafkaProperties();
props.put("group.id" , "MY_CONSUMER_GROUP");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Note the use of the org.oracle.okafka.clients.producer.KafkaConsumer class, for Oracle TxEventQ.
Consumer<String, String> okafkaConsumer = new KafkaConsumer<>(props);
return new SampleConsumer<>(okafkaConsumer, TOPIC_NAME, expectedMessages);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.okafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class OKafkaSampleApp {
public static void main(String[] args) {
SpringApplication.run(OKafkaSampleApp.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.okafka;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.oracle.okafka.clients.admin.AdminClient;
public class OKafkaUtil {
public static Properties getConnectionProperties(String ojdbcPath,
String bootstrapServers,
String securityProtocol,
String serviceName) {
Properties props = new Properties();
props.put("oracle.service.name", serviceName);
props.put("security.protocol", securityProtocol);
props.put("bootstrap.servers", bootstrapServers);
// If using Oracle Database wallet, pass wallet directory
props.put("oracle.net.tns_admin", ojdbcPath);
return props;
}

public static void createTopicIfNotExists(Properties okafkaProperties,
NewTopic newTopic) {
try (Admin admin = AdminClient.create(okafkaProperties)) {
admin.createTopics(Collections.singletonList(newTopic))
.all()
.get();
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("Topic already exists, skipping creation");
} else {
throw new RuntimeException(e);
}
}
}
}
Loading

0 comments on commit f43815b

Please sign in to comment.