Skip to content

Commit

Permalink
fix bad handling of chunked responses in worx connections (experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssadedin committed Jun 10, 2017
1 parent 29dac38 commit 3dd6412
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 45 deletions.
14 changes: 6 additions & 8 deletions src/main/groovy/bpipe/agent/Agent.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ class Agent extends TimerTask {
if(result.commands) {
// Each command is a map, indexed by pguid mapping to a list of Command objects
// which are serialised as Maps
result.commands.each { String pguid, List<Map> commands ->
PipelineInfo pi = this.pipelines[pguid]
result.commands.each { Map command ->
PipelineInfo pi = this.pipelines[command.run.id]
if(!pi) {
log.severe "Unknown pipeline guid $pguid returned from poll for $details.pguids"
log.severe "Unknown pipeline guid $command.run.id returned from poll for $details.pguids"
return
}

// Execute the commands
for(Map commandAttributes in commands) {
executeCommand(commandAttributes, pi)
}
executeCommand(command, pi)
}
}
}
Expand All @@ -121,7 +119,7 @@ class Agent extends TimerTask {
command.args = commandAttributes.arguments.split(",")

String result = command.shellExecute(pi)
worx.sendJson("/result/$commandAttributes.id", [
worx.sendJson("/commandResult/$commandAttributes.id", [
command: commandAttributes.id,
status: "ok",
output: result
Expand All @@ -137,7 +135,7 @@ class Agent extends TimerTask {

log.severe "Command $commandAttributes failed: " + stackTrace

worx.sendJson("/result/$commandAttributes.id", [
worx.sendJson("/commandResult/$commandAttributes.id", [
command: commandAttributes.id,
status: "failed",
error: stackTrace
Expand Down
4 changes: 3 additions & 1 deletion src/main/groovy/bpipe/cmd/BpipeCommand.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ abstract class BpipeCommand {
File tmpFile = new File(COMMAND_TMP, Utils.sha1(command) + ".sh")
tmpFile.setExecutable(true)

"""chmod u+rx $tmpFile.absolutePath""".execute()
String chmodText = """chmod u+rx $tmpFile.absolutePath""".execute().text

log.info "Chmod output: $chmodText"

log.info("Executing command [$command] via temp file $tmpFile in directory $pipelineInfo.path")

Expand Down
132 changes: 98 additions & 34 deletions src/main/groovy/bpipe/worx/WorxConnection.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class WorxConnection {
/**
* Count of number of times this connection failed
*/
int failures = 0
int failures = 0

boolean closeAfterRead = false

void sendJson(String path, Object payload) {
sendJson(path, JsonOutput.toJson(payload))
Expand All @@ -67,16 +69,34 @@ class WorxConnection {
if(socket == null || socket.isClosed())
resetSocket()


if(socket == null)
return

byte [] jsonBytes = eventJson.getBytes("UTF-8")

log.info "POST $path"
log.info "POST $path"

// HttpWriter f = new HttpWriter(wrapped:new FileWriter("http.log"))
//
// f.headerLine("POST $path HTTP/1.1")
// .headerLine("Host: localhost:8080")
// .headerLine("Content-Type: application/json;charset=utf-8")
// .headerLine("Accept: */*")
// .headerLine("User-Agent: curl/7.50.0")
// .headerLine("Content-Length: " + jsonBytes.size()) // encoding?
// .headerLine("")
// .flush()
//
// f.print(eventJson+"\r\n") // note that encoding was set in creation of underlying Writer
// f.headerLine("")
// .flush()
//

socketWriter.headerLine("POST $path HTTP/1.1")
.headerLine("Content-Type: application/json; charset=utf-8")
.headerLine("Host: localhost:8080")
.headerLine("Content-Type: application/json;charset=utf-8")
.headerLine("Accept: */*")
.headerLine("User-Agent: curl/7.50.0")
.headerLine("Content-Length: " + jsonBytes.size()) // encoding?
.headerLine("")
.flush()
Expand All @@ -101,34 +121,79 @@ class WorxConnection {
// Read headers
String line
int blankCount=0
Map headers = [:]
while(true) {
line = socketReader.readLine()
// log.info "GOT RESPONSE line: " + line
if(!line && (++blankCount>0)) {
break
}
if(line)
blankCount = 0
def header = line.trim().split(':')
if(header.size()>1)
headers[header[0]] = header[1]
}

int contentLength = headers['Content-Length'].toInteger()
log.info "Content Length = " + contentLength

if(contentLength > 0) {
char [] buffer = new char[contentLength+1]
socketReader.read(buffer)
// log.info "RAW REPONSE: \n" + buffer

Object result = new JsonSlurper().parse(new StringReader(new String(buffer)))
return result
}
else {
log.info "No response data"
return [:]
Map headers = [:]

try {
List<String> nonHeaderLines = []
while(true) {
line = socketReader.readLine()
// log.fine "GOT RESPONSE line: " + line
if(!line) {
break
}
if(line)
blankCount = 0
List<String> header = line.tokenize(':')*.trim()
if(header.size()>1)
headers[header[0]] = header[1]
else
nonHeaderLines << line
}

// First non-header line should have HTTP status
if(nonHeaderLines.isEmpty())
throw new Exception("No HTTP response code returned from server!")

List<String> statusLineFields = nonHeaderLines[0].tokenize()
if(statusLineFields.size()<2)
throw new Exception("HTTP status line does not have expected format: " + nonHeaderLines[0])

Integer statusCode = statusLineFields[1].toInteger()
if(statusCode >= 400)
throw new Exception("Event post returned status code ${statusCode}:\n" + nonHeaderLines.join('\n'))

Integer contentLength = headers['Content-Length']?.toInteger()?:0
log.info "Content Length = " + contentLength

if(contentLength > 0) {
char [] buffer = new char[contentLength+1]
socketReader.read(buffer)
log.info "RAW REPONSE: \n" + buffer

Object result = new JsonSlurper().parse(new StringReader(new String(buffer)))
return result
}
else
if(headers['Transfer-Encoding'] == 'chunked') {

// Read the chunk length
String chunkLengthValue = socketReader.readLine()
Long chunkSize = Long.decode("0x" + chunkLengthValue)
log.info "Chunk size in bytes: $chunkSize"

char [] buffer = new char[chunkSize]
socketReader.read(buffer)

String payload = new String(buffer)

String newLine = socketReader.readLine()
String nextChunk = socketReader.readLine()
if(nextChunk != "0")
log.warning("next chunk length has unexpected value: $nextChunk")

socketReader.readLine() // consume final empty line

return new JsonSlurper().parse(new StringReader(payload))
}
else {
log.info "No response data"
return [:]
}
}
finally {
log.info "Closing Worx connection by server request"
this.socket.close()
this.socket = null
}
}

Expand All @@ -152,8 +217,7 @@ class WorxConnection {
socket = new Socket(url.host, url.port)

socketReader = new BufferedReader(new InputStreamReader(socket.inputStream))

socketWriter = new HttpWriter(wrapped: new PrintWriter(new OutputStreamWriter(socket.outputStream, "UTF-8")))
socketWriter = new HttpWriter(wrapped: new PrintWriter(new OutputStreamWriter(socket.outputStream, "US-ASCII")))

} catch (Exception e) {
if((failures%20) == 0) {
Expand Down
7 changes: 5 additions & 2 deletions src/main/groovy/bpipe/worx/WorxEventListener.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ class WorxEventListener implements PipelineEventListener {

try {
String json = JsonOutput.toJson([events: [eventDetails]])
log.info "Send: $json"
worx.sendJson("/events", json)
log.info "Sent json OK"
}
catch(Throwable e) {
e.printStackTrace()
}

log.info "Read response ..."
def response = worx.readResponse()
log.info "Response: $response"

Expand All @@ -124,10 +127,10 @@ class WorxEventListener implements PipelineEventListener {
}
}
catch(Exception e) {
log.severe "Failed to send event to $socket $this"
log.info "Error sending event ($e)"
log.severe "Failed to send event to remote worx host: $e"
e.printStackTrace()
}

}

void start() {
Expand Down

0 comments on commit 3dd6412

Please sign in to comment.