Skip to content

Commit

Permalink
chore(proto): Update thrift proto
Browse files Browse the repository at this point in the history
Change-Id: If792b46a6f3e4aca92d052e5a2fcfa1c17e788c4
  • Loading branch information
andeya committed Jun 25, 2019
1 parent 1aac343 commit 99cd6fd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 110 deletions.
49 changes: 4 additions & 45 deletions proto/thriftproto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,20 @@ import (
"github.com/henrylee2cn/teleport/xfer/gzip"
)

var withoutHeader bool

type Home struct {
tp.CallCtx
}

func (h *Home) Test(arg *Test) (*Test, *tp.Rerror) {
if withoutHeader {
if h.CopyMeta().Len() != 0 {
panic("except meta is empty")
}
} else {
if string(h.PeekMeta("peer_id")) != "110" {
panic("except meta: peer_id=110")
}
if string(h.PeekMeta("peer_id")) != "110" {
panic("except meta: peer_id=110")
}
return &Test{
Author: arg.Author + "->OK",
}, nil
}

func TestBinaryProto(t *testing.T) {
withoutHeader = false
gzip.Reg('g', "gizp-5", 5)

// server
Expand Down Expand Up @@ -71,47 +62,16 @@ func TestBinaryProto(t *testing.T) {
}

func TestStructProto(t *testing.T) {
withoutHeader = false
// server
srv := tp.NewPeer(tp.PeerConfig{ListenPort: 9090})
srv.RouteCall(new(Home))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc(true))
defer srv.Close()
time.Sleep(1e9)

// client
cli := tp.NewPeer(tp.PeerConfig{})
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc(true))
if err != nil {
t.Error(err)
}
var result Test
rerr := sess.Call("Home.Test",
&Test{Author: "henrylee2cn"},
&result,
tp.WithAddMeta("peer_id", "110"),
).Rerror()
if rerr != nil {
t.Error(rerr)
}
if result.Author != "henrylee2cn->OK" {
t.FailNow()
}
t.Logf("result:%v", result)
}

func TestStructProtoWithoutHeaders(t *testing.T) {
withoutHeader = true
// server
srv := tp.NewPeer(tp.PeerConfig{ListenPort: 9090})
srv.RouteCall(new(Home))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc(false))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc())
defer srv.Close()
time.Sleep(1e9)

// client
cli := tp.NewPeer(tp.PeerConfig{})
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc(false))
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc())
if err != nil {
t.Error(err)
}
Expand All @@ -136,5 +96,4 @@ test command:
```sh
go test -v -run=TestBinaryProto
go test -v -run=TestStructProto
go test -v -run=TestStructProtoWithoutHeaders
```
6 changes: 3 additions & 3 deletions proto/thriftproto/binary_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
)

func init() {
tp.Printf("Setting thrift-style service method mapper and default thrift body codec...")
tp.Printf("Setting thrift service method mapper and default thrift body codec...")
tp.SetServiceMethodMapper(tp.RPCServiceMethodMapper)
tp.SetDefaultBodyCodec(codec.ID_THRIFT)
}
Expand All @@ -33,8 +33,8 @@ func init() {
func NewBinaryProtoFunc() tp.ProtoFunc {
return func(rw tp.IOWithReadBuffer) tp.Proto {
p := &tBinaryProto{
id: 't',
name: "thrift",
id: 'b',
name: "thrift-binary",
rwCounter: utils.NewReadWriteCounter(rw),
}
p.tProtocol = thrift.NewTHeaderProtocol(&BaseTTransport{
Expand Down
31 changes: 13 additions & 18 deletions proto/thriftproto/struct_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
// NOTE:
// The body codec must be thrift, directly encoded as a thrift.TStruct;
// Support the Meta, but not support the BodyCodec and XferPipe.
func NewStructProtoFunc(supportHeaders bool) tp.ProtoFunc {
func NewStructProtoFunc() tp.ProtoFunc {
return func(rw tp.IOWithReadBuffer) tp.Proto {
p := &tStructProto{
tBinaryProto: tBinaryProto{
id: 't',
name: "thrift",
rwCounter: utils.NewReadWriteCounter(rw),
},
supportHeaders: supportHeaders,
id: 's',
name: "thrift-struct",
rwCounter: utils.NewReadWriteCounter(rw),
}
p.tProtocol = thrift.NewTHeaderProtocol(&BaseTTransport{
ReadWriteCounter: p.rwCounter,
Expand All @@ -32,9 +29,11 @@ func NewStructProtoFunc(supportHeaders bool) tp.ProtoFunc {
}
}

type tStructProto struct {
tBinaryProto
supportHeaders bool
type tStructProto tBinaryProto

// Version returns the protocol's id and name.
func (t *tStructProto) Version() (byte, string) {
return t.id, t.name
}

// Pack writes the Message into the connection.
Expand Down Expand Up @@ -82,10 +81,8 @@ func (t *tStructProto) structPack(m tp.Message) error {
return err
}

if t.supportHeaders {
t.tProtocol.ClearWriteHeaders()
t.tProtocol.SetWriteHeader(HeaderMeta, goutil.BytesToString(m.Meta().QueryString()))
}
t.tProtocol.ClearWriteHeaders()
t.tProtocol.SetWriteHeader(HeaderMeta, goutil.BytesToString(m.Meta().QueryString()))

if err = t.tProtocol.WriteMessageEnd(); err != nil {
return err
Expand Down Expand Up @@ -119,10 +116,8 @@ func (t *tStructProto) structUnpack(m tp.Message) error {
return err
}

if t.supportHeaders {
headers := t.tProtocol.GetReadHeaders()
m.Meta().Parse(headers[HeaderMeta])
}
headers := t.tProtocol.GetReadHeaders()
m.Meta().Parse(headers[HeaderMeta])

m.SetBodyCodec(codec.ID_THRIFT)
return m.SetSize(uint32(t.rwCounter.Readed()))
Expand Down
48 changes: 4 additions & 44 deletions proto/thriftproto/thriftproto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,20 @@ import (
"github.com/henrylee2cn/teleport/xfer/gzip"
)

var withoutHeader bool

type Home struct {
tp.CallCtx
}

func (h *Home) Test(arg *Test) (*Test, *tp.Rerror) {
if withoutHeader {
if h.CopyMeta().Len() != 0 {
panic("except meta is empty")
}
} else {
if string(h.PeekMeta("peer_id")) != "110" {
panic("except meta: peer_id=110")
}
if string(h.PeekMeta("peer_id")) != "110" {
panic("except meta: peer_id=110")
}
return &Test{
Author: arg.Author + "->OK",
}, nil
}

func TestBinaryProto(t *testing.T) {
withoutHeader = false
gzip.Reg('g', "gizp-5", 5)

// server
Expand Down Expand Up @@ -64,47 +55,16 @@ func TestBinaryProto(t *testing.T) {
}

func TestStructProto(t *testing.T) {
withoutHeader = false
// server
srv := tp.NewPeer(tp.PeerConfig{ListenPort: 9090})
srv.RouteCall(new(Home))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc(true))
defer srv.Close()
time.Sleep(1e9)

// client
cli := tp.NewPeer(tp.PeerConfig{})
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc(true))
if err != nil {
t.Error(err)
}
var result Test
rerr := sess.Call("Home.Test",
&Test{Author: "henrylee2cn"},
&result,
tp.WithAddMeta("peer_id", "110"),
).Rerror()
if rerr != nil {
t.Error(rerr)
}
if result.Author != "henrylee2cn->OK" {
t.FailNow()
}
t.Logf("result:%v", result)
}

func TestStructProtoWithoutHeaders(t *testing.T) {
withoutHeader = true
// server
srv := tp.NewPeer(tp.PeerConfig{ListenPort: 9090})
srv.RouteCall(new(Home))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc(false))
go srv.ListenAndServe(thriftproto.NewStructProtoFunc())
defer srv.Close()
time.Sleep(1e9)

// client
cli := tp.NewPeer(tp.PeerConfig{})
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc(false))
sess, err := cli.Dial(":9090", thriftproto.NewStructProtoFunc())
if err != nil {
t.Error(err)
}
Expand Down

0 comments on commit 99cd6fd

Please sign in to comment.