Skip to content
Paul Rogers edited this page Dec 7, 2016 · 6 revisions

Common Protocol

Drill uses Netty for RPC. A good understanding of Netty is assumed here.

At the most basic level, Drill messages are of one of five "modes":

Message Mode Name Number Description
REQUEST 0 Request message sent from client or server
RESPONSE 1 Response message returned server or client
RESPONSE_FAILURE 2 Error response
PING 3 Heartbeat message from client
PONG 4 Heartbeat response from server

In this table, a REQUEST is sent from either end of the connection. A request invites a RESPONSE (or a RESPONSE_FAILURE). The PING/PONG messages are Drill's heartbeat keep-alive mechanism.

Drill messages (both REQUEST and RESPONSE) have three parts:

  • Header, which includes the RPC type, and the lengths of the two body portions. The lengths are consumed internally, RPC type passed to the caller.
  • A Protobuf ("pbody")
  • Data as serialized value vectors ("dbody")

The bodies are optional: any given message may have only a pbody, only a dbody, neither or both. The type of content is determined by the RPC type as will be discussed later.

When receiving messages, Drill sets up a series of Netty handlers to receive and decode messages. The key handler is the RPCBus which handles the "modes" mentioned above. RPCBus handle the PING/PONG messages itself. All others must be handed over to other parts of Drill for processing.

The RPCBus places the incoming message parts (RPC type, pbody and dbody) into a "Request" object: either RequestEvent or ResponseEvent

Netty is asynchronous, so that message processing must be done on other than Netty threads. The RPCBus handles Netty messages on the Netty thread, but immediately creates "Event" objects which are handed off (via a series of indirections) to a worker thread pulled from a thread pool.

The worker thread (via configuration) hands control over to the handle() method on one of the BasicServer implementations:

  • UserServer: Handles communication with a Drill client.
  • DataServer: Handle data transfers.
  • ControlServer: Handles control signals between Drillbits.

The following sessions discuss each resulting application-level protocol.

User Protocol

The user protocol, implemented in UserServer is the easiest application protocol to understand as it consists of a series of well-defined requests and responses.

Operation Request Number Response Number
Run query RUN_QUERY_VALUE 3 QUERY_HANDLE (with query ID) 7
Cancel Query CANCEL_QUERY_VALUE 4 ACK 1
? RESUME_PAUSED_QUERY_VALUE 11 ACK 1
? GET_QUERY_PLAN_FRAGMENTS_VALUE 12 QUERY_PLAN_FRAGMENTS 13
? GET_CATALOGS_VALUE 14 ? ?
Get list of known schemas GET_SCHEMAS_VALUE 15 ? ?
Get list of tables in current schema GET_TABLES_VALUE 16 ? ?
? GET_COLUMNS_VALUE 17 ? ?
? CREATE_PREPARED_STATEMENT_VALUE 22 ? ?

The messages imply a flow:

RUN_QUERY_VALUE : [RESUME_PAUSED_QUERY_VALUE |
                   GET_QUERY_PLAN_FRAGMENTS_VALUE]*
                  CANCEL_QUERY_VALUE ?

Data Protocol

The data protocol, implemented by DataServer is very different than the user protocol. The data protocol implements a "data tunnel" that moves data from the root of one fragment to the leaf of another. The data tunnel protocol is very basic: just connect and send batches. There is no handshake, no setup and very little flow control.

DataServer itself implants a handle() method that accepts just a single kind of message: REQ_RECORD_BATCH_VALUE (3).

The data channel uses the protobuf definitions from BitData.proto. The mapping is:

RPC Type Name Number Protobuf Message
HANDSHAKE 0 BitClientHandshake, BitServerHandshake
ACK 1 N/A
GOODBYE 2 N/A
REQ_RECORD_BATCH 3 FragmentRecordBatch

Presumably some mechanism sets up the data channel. (Need to track that down.)

Basic protocol:

Step Upstream Downstream
Connect Connect Accept Connection
Heartbeat sends PING accepts PING
accepts PONG sends PONG
Send Batch Send Batch 1 Receive Batch 1
Send Batch 2 Receive Batch 2
Send Batch 3 Receive Batch 3. Reply with ACK with status OK
Send failure Send Batch Reply with ACK with status FAIL
Flow Control Wait
Receive OK Send OK
Send ... ...

The FragmentRecordBatch message is the key to the protocol. It has three key parts:

  • A source address (query id, major fragment id, minor fragment id)
  • A destination address (query id, major fragment id, minor fragment id)
  • The actual record batch
  • Flag to indicate if the batch is the last batch.

Key steps:

  • Open the data tunnel using DataConnectorCreator which creates holds a connection DataServer, one for each destination endpoint.
  • Batches are sent with ...
  • DataServer handles incoming RPC messages using the handle() method. DataServer handles only RPC batch messages.
  • Each batch can be sent to any number of minor fragments on the receiving node. (Batches are multiplexed on the receiver side.) handle() starts at a random point to distributed batches to the set of receivers.
  • Batches are handled via the submit() method which uses the (query id, major, minor) address to locate the receiving fragment, iterating over the set of minor fragments provided.
  • The fragment may have failed. In this case, the fragment may have been removed from the fragment registry. If so, the batch is simply discarded.
  • Otherwise, if the fragment exists (without a check to see if the fragment has failed) ask the fragment to handle the batch by calling FragmentManager.handle(). A special protocol here accepts the batch, but returns false if the fragment has not yet started.
  • If the fragment has not yet started, start it.
  • The FragmentManager uses an IncomingBuffers to dispatch the batch to the minor fragment.
  • The IncomingBuffers, when created, retrieves the set of Collector objects from the PlanFragment associated with this fragment.
  • For each batch, IncomingBuffers routes the batch to a DataCollector (while also tracking the number of incoming streams and stream EOF.)
  • The DataCollector must consume the batch, even if the fragment is busy doing other processing. It does so by buffering batches until they can be processed by implementing producer-consumer queue.
  • Finally, a MergingRecordBatch or UnorderedReceiverBatch pulls buffers from the queue and sends them up the fragment tree.

DataServer implements some error handling. If any of the submit() operations fails for the incoming batch, the client is notified via an ACK message with a status of FAIL. Otherwise, the client is notified with an ACK every three batches.

Raw Notes on Data Tunnel

  • A BaseRootExec subclass, such as PartitionSenderRootExec is responsible for sending data to the receiving fragment. The sender uses a PartitionerDecorator to augment a batch with the schema-change and last-batch flags. The sender calls the flushOutgoingBatches() method to initiate sending of batches.
  • The flush batches method can throw an IOException, but has no means to indicate that the receiver has failed (sent a negative ACK.)
  • The partition decorator creates a runnable (CustomRunnable), then hands it to the ExecutorService to send the message on another thread, then waits on a latch until the batch is actually sent.
  • The runnable invokes the code generated from PartitionerTemplate which turns around and gets the data tunnel and invokes DataTunnel.sendRecordBatch() to move the batch along.
  • DataTunnel acquires the send semaphore and invokes ReconnectingConnection.runCommand() to obtain a BasicClient connection and invoke connectAsClient() to send the batch, waiting for the response.
  • SendBatchAsyncListen.doRpcCall sends the batch message via a ThrottlingOutcomeListener.
  • ThrottlingOutcomeListener handles send success and failure (at the Netty level).

When the client returns an Ack, the message is routed to StatusHandler.success() as shown below.

StatusHandler.success():

   public void success(Ack value, ByteBuf buffer) {
    sendingAccountor.decrement();
    if (value.getOk()) { return; }


    logger.error("Data not accepted downstream. Stopping future sends.");
    // if we didn't get ack ok, we'll need to kill the query.
    consumer.accept(new RpcException("Data not accepted downstream."));
  }
Clone this wiki locally