Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Squidex/squidex
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianStehle committed May 30, 2020
2 parents d67387e + 80bb064 commit cc3e739
Show file tree
Hide file tree
Showing 63 changed files with 1,454 additions and 474 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public AlgoliaActionHandler(RuleEventFormatter formatter)
});
}

protected override (string Description, AlgoliaJob Data) CreateJob(EnrichedEvent @event, AlgoliaAction action)
protected override async Task<(string Description, AlgoliaJob Data)> CreateJobAsync(EnrichedEvent @event, AlgoliaAction action)
{
if (@event is EnrichedContentEvent contentEvent)
{
Expand All @@ -45,7 +45,7 @@ protected override (string Description, AlgoliaJob Data) CreateJob(EnrichedEvent
AppId = action.AppId,
ApiKey = action.ApiKey,
ContentId = contentId,
IndexName = Format(action.IndexName, @event)
IndexName = await FormatAsync(action.IndexName, @event)
};

if (contentEvent.Type == EnrichedContentEventType.Deleted ||
Expand All @@ -64,7 +64,8 @@ protected override (string Description, AlgoliaJob Data) CreateJob(EnrichedEvent

if (!string.IsNullOrEmpty(action.Document))
{
jsonString = Format(action.Document, @event)?.Trim();
jsonString = await FormatAsync(action.Document, @event);
jsonString = jsonString?.Trim();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public AzureQueueActionHandler(RuleEventFormatter formatter)
});
}

protected override (string Description, AzureQueueJob Data) CreateJob(EnrichedEvent @event, AzureQueueAction action)
protected override async Task<(string Description, AzureQueueJob Data)> CreateJobAsync(EnrichedEvent @event, AzureQueueAction action)
{
var queueName = Format(action.Queue, @event);
var queueName = await FormatAsync(action.Queue, @event);

var ruleDescription = $"Send AzureQueueJob to azure queue '{queueName}'";
var ruleJob = new AzureQueueJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public CommentActionHandler(RuleEventFormatter formatter, ICommandBus commandBus
this.commandBus = commandBus;
}

protected override (string Description, CommentJob Data) CreateJob(EnrichedEvent @event, CommentAction action)
protected override async Task<(string Description, CommentJob Data)> CreateJobAsync(EnrichedEvent @event, CommentAction action)
{
if (@event is EnrichedContentEvent contentEvent)
{
var text = Format(action.Text, @event);
var text = await FormatAsync(action.Text, @event);

var actor = contentEvent.Actor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public DiscourseActionHandler(RuleEventFormatter formatter, IHttpClientFactory h
this.httpClientFactory = httpClientFactory;
}

protected override (string Description, DiscourseJob Data) CreateJob(EnrichedEvent @event, DiscourseAction action)
protected override async Task<(string Description, DiscourseJob Data)> CreateJobAsync(EnrichedEvent @event, DiscourseAction action)
{
var url = $"{action.Url.ToString().TrimEnd('/')}/posts.json?api_key={action.ApiKey}&api_username={action.ApiUsername}";

var json = new Dictionary<string, object>
{
["title"] = Format(action.Title, @event)
["title"] = await FormatAsync(action.Title, @event)
};

if (action.Topic.HasValue)
Expand All @@ -47,7 +47,7 @@ protected override (string Description, DiscourseJob Data) CreateJob(EnrichedEve
json.Add("category", action.Category.Value);
}

json["raw"] = Format(action.Text, @event);
json["raw"] = await FormatAsync(action.Text, @event);

var requestBody = ToJson(json);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ElasticSearchActionHandler(RuleEventFormatter formatter)
});
}

protected override (string Description, ElasticSearchJob Data) CreateJob(EnrichedEvent @event, ElasticSearchAction action)
protected override async Task<(string Description, ElasticSearchJob Data)> CreateJobAsync(EnrichedEvent @event, ElasticSearchAction action)
{
if (@event is EnrichedContentEvent contentEvent)
{
Expand All @@ -46,7 +46,7 @@ protected override (string Description, ElasticSearchJob Data) CreateJob(Enriche

var ruleJob = new ElasticSearchJob
{
IndexName = Format(action.IndexName, @event),
IndexName = await FormatAsync(action.IndexName, @event),
ServerHost = action.Host.ToString(),
ServerUser = action.Username,
ServerPassword = action.Password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ public EmailActionHandler(RuleEventFormatter formatter)
{
}

protected override (string Description, EmailJob Data) CreateJob(EnrichedEvent @event, EmailAction action)
protected override async Task<(string Description, EmailJob Data)> CreateJobAsync(EnrichedEvent @event, EmailAction action)
{
var ruleJob = new EmailJob
{
ServerHost = action.ServerHost,
ServerUseSsl = action.ServerUseSsl,
ServerPassword = action.ServerPassword,
ServerPort = action.ServerPort,
ServerUsername = Format(action.ServerUsername, @event),
MessageFrom = Format(action.MessageFrom, @event),
MessageTo = Format(action.MessageTo, @event),
MessageSubject = Format(action.MessageSubject, @event),
MessageBody = Format(action.MessageBody, @event)
ServerUsername = await FormatAsync(action.ServerUsername, @event),
MessageFrom = await FormatAsync(action.MessageFrom, @event),
MessageTo = await FormatAsync(action.MessageTo, @event),
MessageSubject = await FormatAsync(action.MessageSubject, @event),
MessageBody = await FormatAsync(action.MessageBody, @event)
};

var description = $"Send an email to {action.MessageTo}";
Expand Down
19 changes: 19 additions & 0 deletions backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,24 @@ public sealed class KafkaAction : RuleAction
[DataType(DataType.Text)]
[Formattable]
public string TopicName { get; set; }

[Display(Name = "Payload (Optional)", Description = "Leave it empty to use the full event as body.")]
[DataType(DataType.MultilineText)]
[Formattable]
public string Payload { get; set; }

[Display(Name = "Key", Description = "The message key, commonly used for partitioning.")]
[DataType(DataType.Text)]
[Formattable]
public string Key { get; set; }

[Display(Name = "Headers (Optional)", Description = "The message headers in the format '[Key]=[Value]', one entry per line.")]
[DataType(DataType.MultilineText)]
[Formattable]
public string Headers { get; set; }

[Display(Name = "Schema (Optional)", Description = "Define a specific AVRO schema in JSON format.")]
[DataType(DataType.MultilineText)]
public string Schema { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
// ==========================================================================

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules.EnrichedEvents;

Expand All @@ -24,29 +27,92 @@ public KafkaActionHandler(RuleEventFormatter formatter, KafkaProducer kafkaProdu
this.kafkaProducer = kafkaProducer;
}

protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action)
protected override async Task<(string Description, KafkaJob Data)> CreateJobAsync(EnrichedEvent @event, KafkaAction action)
{
string value, key;

if (!string.IsNullOrEmpty(action.Payload))
{
value = await FormatAsync(action.Payload, @event);
}
else
{
value = ToEnvelopeJson(@event);
}

if (!string.IsNullOrEmpty(action.Key))
{
key = await FormatAsync(action.Key, @event);
}
else
{
key = @event.Name;
}

var ruleJob = new KafkaJob
{
TopicName = action.TopicName,
MessageKey = @event.Name,
MessageValue = ToEnvelopeJson(@event)
MessageKey = key,
MessageValue = value,
Headers = await ParseHeadersAsync(action.Headers, @event),
Schema = action.Schema
};

return (Description, ruleJob);
}

private async Task<Dictionary<string, string>> ParseHeadersAsync(string headers, EnrichedEvent @event)
{
if (string.IsNullOrWhiteSpace(headers))
{
return null;
}

var headersDictionary = new Dictionary<string, string>();

var lines = headers.Split('\n');

foreach (var line in lines)
{
var indexEqual = line.IndexOf('=');

if (indexEqual > 0 && indexEqual < line.Length - 1)
{
var key = line.Substring(0, indexEqual);
var val = line.Substring(indexEqual + 1);

val = await FormatAsync(val, @event);

headersDictionary[key] = val;
}
}

return headersDictionary;
}

protected override async Task<Result> ExecuteJobAsync(KafkaJob job, CancellationToken ct = default)
{
try
{
await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue);
var message = new Message<string, string> { Key = job.MessageKey, Value = job.MessageValue };

if (job.Headers?.Count > 0)
{
message.Headers = new Headers();

foreach (var header in job.Headers)
{
message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value));
}
}

await kafkaProducer.Send(job.TopicName, message, job.Schema);

return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
}
catch (Exception ex)
{
return Result.Failed(ex, "Push to Kafka failed.");
return Result.Failed(ex, $"Push to Kafka failed: {ex}");
}
}
}
Expand All @@ -58,5 +124,9 @@ public sealed class KafkaJob
public string MessageKey { get; set; }

public string MessageValue { get; set; }

public Dictionary<string, string> Headers { get; set; }

public string Schema { get; set; }
}
}
Loading

0 comments on commit cc3e739

Please sign in to comment.