diff --git a/assets/css/main.scss b/assets/css/main.scss index 18ef88f75..63cecf988 100644 --- a/assets/css/main.scss +++ b/assets/css/main.scss @@ -7,7 +7,9 @@ @import "../modules/datas/filter"; @import "../modules/datas/highlight"; +@import "../modules/forms/ace"; @import "../modules/forms/datetime"; +@import "../modules/forms/multiple"; @import "../modules/tables/table"; @import "../modules/tables/row-action"; \ No newline at end of file diff --git a/assets/css/vendor.scss b/assets/css/vendor.scss index 407b1b8d5..f30962827 100644 --- a/assets/css/vendor.scss +++ b/assets/css/vendor.scss @@ -4,6 +4,7 @@ @import "../../node_modules/font-awesome/scss/font-awesome"; @import "../../node_modules/sweetalert2/src/sweetalert2"; $bs-datetimepicker-btn-hover-bg: $gray-800; +$bs-datetimepicker-primary-border-color: $dropdown-border-color; @import "../../node_modules/tempusdominus-bootstrap-4/src/sass/tempusdominus-bootstrap-4"; @import "../../node_modules/highlight.js/styles/darkula.css"; diff --git a/assets/js/main.js b/assets/js/main.js index 081515417..0cc4078e1 100644 --- a/assets/js/main.js +++ b/assets/js/main.js @@ -7,8 +7,10 @@ import '../modules/datas/data-confirm'; import '../modules/datas/highlight'; import '../modules/datas/search-navbar'; import '../modules/datas/search-sse'; +import '../modules/forms/ace'; import '../modules/forms/config'; import '../modules/forms/get'; +import '../modules/forms/multiple'; import '../modules/tables/row-action'; import '../modules/templates/sidebar'; import '../modules/templates/toast' diff --git a/assets/modules/forms/ace.js b/assets/modules/forms/ace.js new file mode 100644 index 000000000..a1bab61d1 --- /dev/null +++ b/assets/modules/forms/ace.js @@ -0,0 +1,26 @@ +import $ from "jquery"; +import "../widget"; +import ace from 'ace-builds'; +import "ace-builds/src-noconflict/theme-tomorrow"; +import "ace-builds/webpack-resolver"; + +$.widget("khq.ace-editor", $.khq.widget, { + + _create: function () { + let textarea = this.element.find('> textarea'); + + let editor = ace.edit(this.element.find('> div')[0], { + minLines: 5, + maxLines: 48, + autoScrollEditorIntoView: true, + theme: "ace/theme/tomorrow" + }); + + editor.renderer.setScrollMargin(10, 10, 10, 10); + + editor.getSession().setValue(textarea.val()); + editor.getSession().on('change', function () { + textarea.val(editor.getSession().getValue()); + }); + } +}); diff --git a/assets/modules/forms/ace.scss b/assets/modules/forms/ace.scss new file mode 100644 index 000000000..32a1db3e9 --- /dev/null +++ b/assets/modules/forms/ace.scss @@ -0,0 +1,9 @@ +.khq-ace-editor { + > div { + height: $input-height-inner * 3 ; + } + + > textarea { + display: none; + } +} \ No newline at end of file diff --git a/assets/modules/forms/datetime.js b/assets/modules/forms/datetime.js index 298dab85a..627b66f0b 100644 --- a/assets/modules/forms/datetime.js +++ b/assets/modules/forms/datetime.js @@ -8,6 +8,25 @@ $.widget("khq.datetime", $.khq.widget, { const self = this; const element = this.element; + if (element.find('.datetime-container').length > 0) { + this._inline(element); + } else { + this._hover(element); + } + + }, + + _onChange: function(event) { + const element = this.element; + + if (event.date) { + element + .find('input') + .val(event.date.toISOString()); + } + }, + + _inline: function (element) { let datetimeTempusOptions = { inline: true, sideBySide: true, @@ -20,14 +39,17 @@ $.widget("khq.datetime", $.khq.widget, { element .find('.datetime-container') - .on('change.datetimepicker', function (event) { - if (event.date) { - element - .find('input') - .val(event.date.toISOString()); - } - }) + .on('change.datetimepicker', $.proxy(this._onChange, this)) .datetimepicker(datetimeTempusOptions); + }, + _hover: function (element) { + element + .find('input') + .on('change.datetimepicker', $.proxy(this._onChange, this)) + .datetimepicker({ + sideBySide: true, + useCurrent: false, + }); }, }); diff --git a/assets/modules/forms/datetime.scss b/assets/modules/forms/datetime.scss index 9d945e524..ac742f66a 100644 --- a/assets/modules/forms/datetime.scss +++ b/assets/modules/forms/datetime.scss @@ -11,25 +11,25 @@ border-top: 2px solid $gray-700; } } + } + + .bootstrap-datetimepicker-widget { + .timepicker-hour, .timepicker-minute, .timepicker-second { + width: 16px; + font-weight: normal; + } - .bootstrap-datetimepicker-widget { - .timepicker-hour, .timepicker-minute, .timepicker-second { + table { + & td { + height: 16px; + line-height: 16px; width: 16px; - font-weight: normal; - } - table { - & td { + span { + display: inline-block; + width: 16px; height: 16px; line-height: 16px; - width: 16px; - - span { - display: inline-block; - width: 16px; - height: 16px; - line-height: 16px; - } } } } diff --git a/assets/modules/forms/multiple.js b/assets/modules/forms/multiple.js new file mode 100644 index 000000000..6b55cb10d --- /dev/null +++ b/assets/modules/forms/multiple.js @@ -0,0 +1,20 @@ +import $ from "jquery"; +import "../widget"; + +$.widget("khq.multiple", $.khq.widget, { + + _create: function () { + let self = this; + + self.element.find('button').on("click", function (e) { + e.preventDefault(); + + let clone = $(this).closest('div').clone(); + clone.find('input').each(function(key, value) { + $(value).val(''); + }); + + self.element.append(clone); + }); + } +}); diff --git a/assets/modules/forms/multiple.scss b/assets/modules/forms/multiple.scss new file mode 100644 index 000000000..b874d11e9 --- /dev/null +++ b/assets/modules/forms/multiple.scss @@ -0,0 +1,19 @@ +.khq-multiple { + > div { + display: flex; + flex-direction: row; + margin-bottom: $form-group-margin-bottom; + + &:last-child { + margin-bottom: 0; + } + + > * { + margin-right: $form-group-margin-bottom; + + &:last-child { + margin-right: 0; + } + } + } +} \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index a3c833a98..ab261cacf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -204,6 +204,11 @@ "negotiator": "0.6.1" } }, + "ace-builds": { + "version": "1.4.2", + "resolved": "https://registry.npmjs.org/ace-builds/-/ace-builds-1.4.2.tgz", + "integrity": "sha512-M1JtZctO2Zg+1qeGUFZXtYKsyaRptqQtqpVzlj80I0NzGW9MF3um0DBuizIvQlrPYUlTdm+wcOPZpZoerkxQdA==" + }, "acorn": { "version": "5.7.3", "resolved": "https://registry.npmjs.org/acorn/-/acorn-5.7.3.tgz", diff --git a/package.json b/package.json index 74c8d8341..6b5549996 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "description": "Kafka GUI for topics, topics data, consumers group and more", "main": "index.js", "dependencies": { + "ace-builds": "^1.4.2", "bootstrap": "^4.2.1", "bytes": "^3.0.0", "font-awesome": "^4.7.0", diff --git a/public/topic.ftl b/public/topic.ftl index 7f175bde8..1bd133e4d 100644 --- a/public/topic.ftl +++ b/public/topic.ftl @@ -72,6 +72,9 @@ +<@template.bottom> + Produce to topic + <@template.footer/> diff --git a/public/topicProduce.ftl b/public/topicProduce.ftl new file mode 100644 index 000000000..91311ce7f --- /dev/null +++ b/public/topicProduce.ftl @@ -0,0 +1,57 @@ +<#-- @ftlvariable name="topic" type="org.kafkahq.models.Topic" --> + +<#import "/includes/template.ftl" as template> +<#import "/includes/functions.ftl" as functions> + +<@template.header "Produce to " + topic.getName(), "topic" /> + +
+
+ +
+ +
+
+
+ +
+ +
+
+
+ +
+
+ + + +
+
+
+
+ +
+ +
+
+
+ +
+
+
>
+ +
+
+
+ +
+ +
+
+ +<@template.footer/> \ No newline at end of file diff --git a/src/main/java/org/kafkahq/controllers/GroupController.java b/src/main/java/org/kafkahq/controllers/GroupController.java index 00fe117bb..76f814ae5 100644 --- a/src/main/java/org/kafkahq/controllers/GroupController.java +++ b/src/main/java/org/kafkahq/controllers/GroupController.java @@ -128,11 +128,11 @@ public Result offsetsStart(Request request, String cluster, String groupName, St @GET @Path("{groupName}/delete") - public Result delete(Request request, String cluster, String id) { + public Result delete(Request request, String cluster, String groupName) { this.toast(request, RequestHelper.runnableToToast(() -> - this.consumerGroupRepository.delete(cluster, id), - "Consumer group '" + id + "' is deleted", - "Failed to consumer group " + id + this.consumerGroupRepository.delete(cluster, groupName), + "Consumer group '" + groupName + "' is deleted", + "Failed to consumer group " + groupName )); return Results.ok(); diff --git a/src/main/java/org/kafkahq/controllers/TopicController.java b/src/main/java/org/kafkahq/controllers/TopicController.java index 8599d05b7..b814e466f 100644 --- a/src/main/java/org/kafkahq/controllers/TopicController.java +++ b/src/main/java/org/kafkahq/controllers/TopicController.java @@ -17,10 +17,8 @@ import org.kafkahq.repositories.RecordRepository; import org.kafkahq.repositories.TopicRepository; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; +import java.time.Instant; +import java.util.*; import java.util.concurrent.ExecutionException; @Path("/{cluster}/topic") @@ -85,6 +83,57 @@ public void createSubmit(Request request, Response response, String cluster) thr response.redirect("/" + cluster + "/topic"); } + @GET + @Path("{topicName}/produce") + public View produce(Request request, String cluster, String topicName) throws ExecutionException, InterruptedException { + Topic topic = this.topicRepository.findByName(topicName); + + return this.template( + request, + cluster, + Results + .html("topicProduce") + .put("topic", topic) + ); + } + + @POST + @Path("{topicName}/produce") + public void produceSubmit(Request request, Response response, String cluster, String topicName) throws Throwable { + List headersKey = request.param("headers[key][]").toList(); + List headersValue = request.param("headers[value][]").toList(); + + Map headers = new HashMap<>(); + + int i = 0; + for (String key : headersKey) { + if (key != null && !key.equals("") && headersValue.get(i) != null && !headersValue.get(i).equals("")) { + headers.put(key, headersValue.get(i)); + } + i++; + } + + this.toast(request, RequestHelper.runnableToToast(() -> { + this.recordRepository.produce( + cluster, + topicName, + request.param("value").value(), + headers, + request.param("key").toOptional(), + request.param("partition").toOptional().filter(r -> !r.equals("")).map(Integer::valueOf), + request.param("timestamp") + .toOptional(String.class) + .filter(r -> !r.equals("")) + .map(s -> Instant.parse(s).toEpochMilli()) + ); + }, + "Record created", + "Failed to produce record" + )); + + response.redirect(request.path()); + } + @GET @Path("{topicName}") public View home(Request request, String cluster, String topicName) throws ExecutionException, InterruptedException { @@ -201,7 +250,6 @@ public void updateConfig(Request request, Response response, String cluster, Str @GET @Path("{topicName}/deleteRecord") public Result deleteRecord(Request request, Response response, String cluster, String topicName, Integer partition, String key) throws Throwable { - this.toast(request, RequestHelper.runnableToToast(() -> this.recordRepository.delete( cluster, topicName, diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java index 360cbc816..aadb32945 100644 --- a/src/main/java/org/kafkahq/repositories/RecordRepository.java +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -11,7 +11,9 @@ import lombok.experimental.Wither; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeader; import org.codehaus.httpcache4j.uri.URIBuilder; import org.jooby.Env; import org.jooby.Jooby; @@ -305,6 +307,22 @@ private ConsumerRecords poll(KafkaConsumer consu */ } + + public RecordMetadata produce(String clusterId, String topic, String value, Map headers, Optional key, Optional partition, Optional timestamp) throws ExecutionException, InterruptedException { + return kafkaModule.getProducer(clusterId).send(new ProducerRecord<>( + topic, + partition.orElse(null), + timestamp.orElse(null), + key.orElse(null), + value, + headers + .entrySet() + .stream() + .map(entry -> new RecordHeader(entry.getKey(), entry.getValue().getBytes())) + .collect(Collectors.toList()) + )).get(); + } + public void delete(String clusterId, String topic, Integer partition, String key) throws ExecutionException, InterruptedException { kafkaModule.getProducer(clusterId).send(new ProducerRecord<>( topic, diff --git a/webpack.config.js b/webpack.config.js index c7e72e9ae..d462eed80 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -30,6 +30,7 @@ module.exports = (env, argv) => { 'highlight.js', 'bytes', 'humanize-duration', + 'ace-builds', path.join(srcDirectory, 'css/vendor.scss') ], main: [