Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner committed Oct 8, 2024
1 parent c8743d5 commit eda7f8a
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,77 @@
import io.temporal.api.common.v1.Link;
import io.temporal.api.enums.v1.EventType;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.StringTokenizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinkConverter {

private static final Logger log = LoggerFactory.getLogger(StateMachines.class);

private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";
private static final String linkQueryFormat =
"?eventID=%d&eventType=%s&referenceType=EventReference";

public static io.temporal.api.nexus.v1.Link WorkflowEventToNexusLink(Link.WorkflowEvent we) {
String url =
String.format(linkPathFormat, we.getNamespace(), we.getWorkflowId(), we.getRunId());
if (we.hasEventRef()) {
url +=
public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
try {
String url =
String.format(
linkQueryFormat, we.getEventRef().getEventId(), we.getEventRef().getEventType());
linkPathFormat,
URLEncoder.encode(we.getNamespace(), StandardCharsets.UTF_8.toString()),
URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()),
URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString()));

if (we.hasEventRef()) {
url += "?";
if (we.getEventRef().getEventId() > 0) {
url += "eventID=" + we.getEventRef().getEventId() + "&";
}
url +=
"eventType="
+ URLEncoder.encode(
we.getEventRef().getEventType().name(), StandardCharsets.UTF_8.toString())
+ "&";
url += "referenceType=EventReference";
}

return io.temporal.api.nexus.v1.Link.newBuilder()
.setUrl(url)
.setType(we.getDescriptorForType().getFullName())
.build();
} catch (Exception e) {
log.error("Failed to encode Nexus link URL", e);
}
return io.temporal.api.nexus.v1.Link.newBuilder()
.setUrl(url)
.setType(we.getDescriptorForType().getFullName())
.build();
return null;
}

public static Link NexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusLink) {
public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusLink) {
Link.Builder link = Link.newBuilder();
try {
URI uri = new URI(nexusLink.getUrl());

StringTokenizer st = new StringTokenizer(uri.getPath(), "/");
st.nextToken(); // /namespaces/
String namespace = st.nextToken();
st.nextToken(); // /workflows/
String workflowID = st.nextToken();
String runID = st.nextToken();
if (!uri.getScheme().equals("temporal")) {
log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme());
return null;
}

StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/");
if (!st.nextToken().equals("namespaces")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.nextToken().equals("workflows")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String workflowID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
String runID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !st.nextToken().equals("history")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}

Link.WorkflowEvent.Builder we =
Link.WorkflowEvent.newBuilder()
Expand All @@ -67,7 +104,8 @@ public static Link NexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL
if (uri.getQuery() != null) {
Link.WorkflowEvent.EventReference.Builder eventRef =
Link.WorkflowEvent.EventReference.newBuilder();
st = new StringTokenizer(uri.getQuery(), "&");
String query = URLDecoder.decode(uri.getQuery(), StandardCharsets.UTF_8.toString());
st = new StringTokenizer(query, "&");
while (st.hasMoreTokens()) {
String[] param = st.nextToken().split("=");
switch (param[0]) {
Expand All @@ -79,9 +117,11 @@ public static Link NexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL
}
}
we.setEventRef(eventRef);
link.setWorkflowEvent(we);
}
} catch (URISyntaxException e) {
} catch (Exception e) {
// Swallow un-parsable links since they are not critical to processing
log.error("Failed to parse Nexus link URL", e);
return null;
}
return link.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private static void scheduleNexusOperation(
NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);

Link link =
WorkflowEventToNexusLink(
workflowEventToNexusLink(
io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
.setNamespace(ctx.getNamespace())
.setWorkflowId(ctx.getExecution().getWorkflowId())
Expand Down Expand Up @@ -755,7 +755,7 @@ private static void startNexusOperation(
.equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
continue;
}
event.addLinks(NexusLinkToWorkflowEvent(l));
event.addLinks(nexusLinkToWorkflowEvent(l));
}

ctx.addEvent(event.build());
Expand Down
Loading

0 comments on commit eda7f8a

Please sign in to comment.