Skip to content

Commit

Permalink
Code cleanup (#48)
Browse files Browse the repository at this point in the history
* Clean up RelpBatchTest a bit

* Clean up RelpBatch

* Change US-ASCII to use StandardCharsets version

* Rename requestID to RequestID

* Remove unnecessary throws from putHeader

* Change license url to be https

* Remove unused imports
  • Loading branch information
StrongestNumber9 authored Aug 23, 2023
1 parent 35b8b37 commit 29f7bbe
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<licenses>
<license>
<name>Apache License v2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/teragrep/rlp_01/AbstractRelpFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package com.teragrep.rlp_01;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* An abstract RELP frame class, contains only the header part.
*
Expand Down Expand Up @@ -90,12 +91,7 @@ protected String readString(ByteBuffer src, int dataLength) {
if (dataLength > 0) {
byte[] bytes = new byte[dataLength];
src.get(bytes);
try {
return new String(bytes, "US-ASCII");
} catch (UnsupportedEncodingException e) {
// this really shouldn't happen..
throw new RuntimeException(e);
}
return new String(bytes, StandardCharsets.US_ASCII);
} else {
return null;
}
Expand Down
45 changes: 14 additions & 31 deletions src/main/java/com/teragrep/rlp_01/RelpBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
*/
public class RelpBatch {

private requestID reqID;
private final RequestID reqID;

private TreeMap<Long, RelpFrameTX> requests;
private TreeMap<Long, RelpFrameRX> responses;
private final TreeMap<Long, RelpFrameTX> requests;
private final TreeMap<Long, RelpFrameRX> responses;

// Not processed queue, for asynchronous use.
private TreeSet<Long> workQueue;
private final TreeSet<Long> workQueue;

public RelpBatch() {
this.reqID = new requestID();
this.reqID = new RequestID();
this.requests = new TreeMap<Long, RelpFrameTX>();
this.responses = new TreeMap<Long, RelpFrameRX>();
this.workQueue = new TreeSet<>();
Expand Down Expand Up @@ -73,12 +73,7 @@ public long putRequest( RelpFrameTX request ) {
}

public RelpFrameTX getRequest(Long id) {
if (this.requests.containsKey(id)) {
return this.requests.get(id);
}
else {
return null;
}
return this.requests.getOrDefault(id, null);
}

public void removeRequest(Long id) {
Expand All @@ -87,12 +82,7 @@ public void removeRequest(Long id) {
}

public RelpFrameRX getResponse(Long id) {
if (this.responses.containsKey(id)) {
return this.responses.get(id);
}
else {
return null;
}
return this.responses.getOrDefault(id, null);
}

public void putResponse(Long id, RelpFrameRX response) {
Expand All @@ -102,20 +92,15 @@ public void putResponse(Long id, RelpFrameRX response) {
}

public boolean verifyTransaction(Long id) {
if( this.requests.containsKey( id ) &&
this.getResponse( id ) != null &&
this.responses.get( id ).getResponseCode() == 200 ) {
return true;
} else {
return false;
}
return this.requests.containsKey(id) &&
this.getResponse(id) != null &&
this.responses.get(id).getResponseCode() == 200;
}

public boolean verifyTransactionAll() {
Set<Long> reqIds = this.requests.keySet();
Iterator<Long> reqIt = reqIds.iterator();
while(reqIt.hasNext()) {
if (this.verifyTransaction(reqIt.next()) == false) {
for (Long reqId : reqIds) {
if (!this.verifyTransaction(reqId)) {
return false;
}
}
Expand All @@ -124,10 +109,8 @@ public boolean verifyTransactionAll() {

public void retryAllFailed() {
Set<Long> reqIds = this.requests.keySet();
Iterator<Long> reqIt = reqIds.iterator();
while(reqIt.hasNext()) {
Long reqId = reqIt.next();
if (this.verifyTransaction(reqId) == false) {
for (Long reqId : reqIds) {
if (!this.verifyTransaction(reqId)) {
this.retryRequest(reqId);
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/com/teragrep/rlp_01/RelpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.net.ssl.SSLEngine;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

Expand All @@ -41,11 +42,7 @@ public class RelpConnection implements RelpSender {
private final static byte[] OFFER;

static {
try {
OFFER = ("\nrelp_version=0\nrelp_software=RLP-01\ncommands=" + RelpCommand.SYSLOG + "\n").getBytes("US-ASCII");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
OFFER = ("\nrelp_version=0\nrelp_software=RLP-01\ncommands=" + RelpCommand.SYSLOG + "\n").getBytes(StandardCharsets.US_ASCII);
}

private enum RelpConnectionState {
Expand Down
25 changes: 10 additions & 15 deletions src/main/java/com/teragrep/rlp_01/RelpFrameTX.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.teragrep.rlp_01;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* This is the frame for transmitting RELP messages.
Expand Down Expand Up @@ -56,12 +56,12 @@ public void write(ByteBuffer dst) throws IOException {
dst.put((byte)'\n');
}

public int length() throws UnsupportedEncodingException {
int txn = Integer.toString(this.transactionNumber).getBytes("US-ASCII").length;
public int length() {
int txn = Integer.toString(this.transactionNumber).getBytes(StandardCharsets.US_ASCII).length;
int sp1 = 1;
int command = this.command.getBytes("US-ASCII").length;
int command = this.command.getBytes(StandardCharsets.US_ASCII).length;
int sp2 = 1;
int length = Integer.toString(this.dataLength).getBytes("US-ASCII").length;
int length = Integer.toString(this.dataLength).getBytes(StandardCharsets.US_ASCII).length;
int sp3 = 1;
int data;
if (this.data == null) {
Expand Down Expand Up @@ -92,26 +92,21 @@ private void putData(ByteBuffer dst) {
* Writes a HEADER part of the RELP message to the byte buffer.
*
* @param dst
* @throws UnsupportedEncodingException
* Shouldn't happen for US-ASCII..
*/
private void putHeader(ByteBuffer dst) throws UnsupportedEncodingException {
dst.put(Integer.toString(this.transactionNumber).getBytes("US-ASCII"));
private void putHeader(ByteBuffer dst) {
dst.put(Integer.toString(this.transactionNumber).getBytes(StandardCharsets.US_ASCII));
dst.put((byte)' ');
dst.put(this.command.getBytes("US-ASCII"));
dst.put(this.command.getBytes(StandardCharsets.US_ASCII));
dst.put((byte)' ');
dst.put(Integer.toString(this.dataLength).getBytes("US-ASCII"));
dst.put(Integer.toString(this.dataLength).getBytes(StandardCharsets.US_ASCII));
}

/**
Override for toString method. Returns the entire RELP message formatted with spaces.
*/
@Override
public String toString() {
try {
return this.transactionNumber + " " + this.command + " " + this.dataLength + " " + (this.data != null ? new String(this.data, "UTF-8") : "");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
return this.transactionNumber + " " + this.command + " " + this.dataLength + " " + (this.data != null ? new String(this.data, StandardCharsets.UTF_8) : "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package com.teragrep.rlp_01;

public class requestID {
public class RequestID {

private long requestIdentifier;

requestID() {
RequestID() {
this.requestIdentifier = 0;
}

Expand Down
39 changes: 17 additions & 22 deletions src/test/java/com/teragrep/rlp_01/RelpBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package com.teragrep.rlp_01;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class RelpBatchTest {
private static final String message = "syslog message";
private static final String message = "Message With Ünïcödë Characters";

@Test
public void testInsert() {
Expand All @@ -36,7 +35,7 @@ public void testInsert() {
Assertions.assertEquals(
String.format(
"0 syslog %s %s",
message.length(),
message.getBytes().length,
message
),
frame.toString(),
Expand Down Expand Up @@ -79,7 +78,8 @@ public void testGetResponse() {
RelpBatch batch = new RelpBatch();
Long id = batch.insert(message.getBytes(StandardCharsets.UTF_8));
Assertions.assertNull(batch.getResponse(id), "Got a response but shouldn't have");
batch.putResponse(id, new RelpFrameRX(id.intValue(), "X", 1, ByteBuffer.allocateDirect(1)));
String response = "200 OK";
batch.putResponse(id, new RelpFrameRX(id.intValue(), RelpCommand.SYSLOG, response.length(), createResponseBuffer(response)));
Assertions.assertNotNull(batch.getResponse(id), "Got a null response");
}

Expand All @@ -98,10 +98,7 @@ public void testVerifyTransaction() {
Long id = batch.insert(message.getBytes(StandardCharsets.UTF_8));
Assertions.assertFalse(batch.verifyTransaction(id), "Verified transaction");
String response = "200 OK";
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
batch.putResponse(id, new RelpFrameRX(id.intValue(), RelpCommand.SYSLOG, response.length(), buffer));
batch.putResponse(id, new RelpFrameRX(id.intValue(), RelpCommand.SYSLOG, response.length(), createResponseBuffer(response)));
Assertions.assertTrue(batch.verifyTransaction(id), "Didn't verify transaction");
}

Expand All @@ -117,10 +114,8 @@ public void testVerifyTransactionAll() {
for(int i=0; i<messages; i++) {
Assertions.assertFalse(batch.verifyTransaction(ids[i]), "Verified transaction that was not completed");
String response = "200 OK";
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), buffer));
Assertions.assertFalse(batch.verifyTransaction(ids[i]), "Verified transaction that was not completed");
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), createResponseBuffer(response)));
Assertions.assertTrue(batch.verifyTransaction(ids[i]), "Verified transaction that was completed");
}
Assertions.assertTrue(batch.verifyTransactionAll(), "Did not verify all transactions");
Expand All @@ -140,10 +135,7 @@ public void testRetryAllFailed() {
int resolve = 3;
for(int i=0; i<resolve; i++) {
String response = "200 OK";
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), buffer));
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), createResponseBuffer(response)));
}
// Clean work queue to pretend we have sent all
int len = batch.getWorkQueueLength();
Expand All @@ -156,10 +148,7 @@ public void testRetryAllFailed() {
// Resolve last messages
for(int i=resolve; i<messages; i++) {
String response = "200 OK";
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), buffer));
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), createResponseBuffer(response)));
}
// Resolve work queue again
len = batch.getWorkQueueLength();
Expand All @@ -186,9 +175,8 @@ public void testWorkerQueueSize() {
public void testPopWorkQueue() {
RelpBatch batch = new RelpBatch();
int messages = 5;
Long[] ids = new Long[messages];
for(int i=0; i<messages;i++) {
ids[i] = batch.insert(message.getBytes(StandardCharsets.UTF_8));
batch.insert(message.getBytes(StandardCharsets.UTF_8));
}
Assertions.assertEquals(messages, batch.getWorkQueueLength(), "Queue length was not as expected");
// Try depleting
Expand All @@ -197,4 +185,11 @@ public void testPopWorkQueue() {
}
Assertions.assertEquals(0, batch.getWorkQueueLength(), "Queue length was not as expected");
}

private ByteBuffer createResponseBuffer(String response) {
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
return buffer;
}
}
2 changes: 1 addition & 1 deletion src/test/java/com/teragrep/rlp_01/RequestIDTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class RequestIDTest {
@Test
public void testCreateIdentifier() {
requestID id = new requestID();
RequestID id = new RequestID();
Assertions.assertEquals(0, id.getNextID(), "Id did not increment correctly");
Assertions.assertEquals(1, id.getNextID(), "Id did not increment correctly");
Assertions.assertEquals(2, id.getNextID(), "Id did not increment correctly");
Expand Down

0 comments on commit 29f7bbe

Please sign in to comment.