Skip to content

Commit

Permalink
Add hold on
Browse files Browse the repository at this point in the history
  • Loading branch information
Qing committed Mar 20, 2020
1 parent 677dc9f commit a6b9a6c
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 127 deletions.
40 changes: 33 additions & 7 deletions common/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ const (
DGTWINS_OPS_WATCH = "Watch"
DGTWINS_OPS_SYNC = "Sync"
DGTWINS_OPS_DETECT = "Detect"
DGTWINS_OPS_KEEPALIVE = "Keepalive"

//State
DGTWINS_STATE_ONLINE ="online"
DGTWINS_STATE_OFFLINE ="offline"
DGTWINS_STATE_CREATED = "created"
DGTWINS_STATE_ONLINE = "online"
DGTWINS_STATE_OFFLINE = "offline"

// Resource
DGTWINS_RESOURCE_EDGE ="edge"
DGTWINS_RESOURCE_TWINS ="twins"
DGTWINS_RESOURCE_PROPERTY ="property"
DGTWINS_RESOURCE_DEVICE ="device"
Expand All @@ -58,14 +61,14 @@ const (

//Create/update/Delete/Get twins message format
type TwinMessage struct{
Twins []DeviceTwin `json:"twins"`
Twins []DigitalTwin `json:"twins"`
}

// Response message format
type TwinResponse struct{
Code int `json:"code"`
Reason string `json:"reason,omitempty"`
Twins []DeviceTwin `json:"twins,omitempty"`
Twins []DigitalTwin `json:"twins,omitempty"`
}

/*
Expand Down Expand Up @@ -107,7 +110,7 @@ func GetPropertyValue(props []TwinProperty, name string) *TwinProperty {
}

// BuildResponseMessage
func BuildResponseMessage(code int, reason string, twins []DeviceTwin) ([]byte, error){
func BuildResponseMessage(code int, reason string, twins []DigitalTwin) ([]byte, error){
resp := &TwinResponse{
Code: code,
Reason: reason,
Expand Down Expand Up @@ -135,8 +138,25 @@ func UnMarshalResponseMessage(msg *model.Message)(*TwinResponse, error){
return &rspMsg, nil
}

//UnMarshalTwinMessage.
func UnMarshalTwinMessage(msg *model.Message)(*TwinMessage, error){
var twinMsg TwinMessage

content, ok := msg.Content.([]byte)
if !ok {
return nil, errors.New("invaliad message content")
}

err := json.Unmarshal(content, &twinMsg)
if err != nil {
return nil, err
}

return &twinMsg, nil
}

// BuildTwinMessage
func BuildTwinMessage(twins []DeviceTwin) ([]byte, error){
func BuildTwinMessage(twins []DigitalTwin) ([]byte, error){
twinMsg := &TwinMessage{
Twins: twins,
}
Expand Down Expand Up @@ -165,7 +185,7 @@ func BuildModelMessage(source string, target string, operation string, resource

// GetTwinID
func GetTwinID(msg *model.Message) string {
var twins []DeviceTwin
var twins []DigitalTwin
operation := msg.GetOperation()

content, ok := msg.Content.([]byte)
Expand Down Expand Up @@ -255,3 +275,9 @@ func UnMarshalDeviceResponseMessage(msg *model.Message)(*DeviceResponse, error){

return &respMsg, nil
}

type EdgeInfo struct{
EdgeID string `json:"edgeid"`
EdgeName string `json:"edgeid,omitempty"`
Description string `json:"edgeid,omitempty"`
}
20 changes: 7 additions & 13 deletions dgtwin/dtcontext/dtcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,15 @@ func (dtc *DTContext) DGTwinIsExist (deviceID string) bool {
return true
}

func (dtc *DTContext) TwinIsOnline(deviceID string) bool {
v, _ := dtc.DGTwinList.Load(deviceID)
func (dtc *DTContext) GetTwinState(twinID string) string {
v, _ := dtc.DGTwinList.Load(twinID)
dgTwin, _ := v.(*common.DigitalTwin)

if dgTwin != nil {
if dgTwin.State == common.DGTWINS_STATE_ONLINE {
return true
}
if dgTwin == nil {
return ""
}

return false
return dgTwin.State
}

func (dtc *DTContext) BuildModelMessage(source string, target string, operation string, resource string, content interface{}) *model.Message {
Expand Down Expand Up @@ -199,13 +197,9 @@ func (dtc *DTContext) SendResponseMessage(requestMsg *model.Message, content []b
}

//SendSyncMessage Send sync conten.
func (dtc *DTContext) SendSyncMessage(we *types.WatchEvent, content []byte){
target := we.Source
resource := we.Resource

func (dtc *DTContext) SendSyncMessage(target, resource string, content []byte){
modelMsg := dtc.BuildModelMessage(types.MODULE_NAME, target,
common.DGTWINS_OPS_SYNC, resource, content)
modelMsg.SetTag(we.MsgID)
common.DGTWINS_OPS_SYNC, resource, content)
klog.Infof("Send sync message (%v)", modelMsg)

dtc.SendToModule(types.DGTWINS_MODULE_COMM, modelMsg)
Expand Down
49 changes: 30 additions & 19 deletions dgtwin/dtmodule/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (cm *CommModule) InitModule(dtc *dtcontext.DTContext, comm, heartBeat, conf
//TODO: Device should has a healthcheck.
func (cm *CommModule) Start(){
//Start loop.
checkTimeoutCh := time.After(10*time.Second)
for {
select {
case msg, ok := <-cm.recieveChan:
Expand Down Expand Up @@ -87,9 +88,10 @@ func (cm *CommModule) Start(){
klog.Infof("%s module stopped", cm.Name())
return
}
case <-time.After(60*time.Second):
case <-checkTimeoutCh:
//check the MessageCache for response.
cm.dealMessageTimeout()
checkTimeoutCh = time.After(10*time.Second)
}
}
}
Expand All @@ -113,13 +115,16 @@ func (cm *CommModule) sendMessageToDevice(msg *model.Message) {

//sendMessageToHub
func (cm *CommModule) sendMessageToHub(msg *model.Message) {
//cache this message for confirm recieve the response.
id := msg.GetID()
_, exist := cm.context.MessageCache.Load(id)
if !exist {
cm.context.MessageCache.Store(id, msg)
}
operation := msg.GetOperation()

if strings.Compare(common.DGTWINS_OPS_RESPONSE, operation) != 0 {
//cache this message for confirm recieve the response.
id := msg.GetID()
_, exist := cm.context.MessageCache.Load(id)
if !exist {
cm.context.MessageCache.Store(id, msg)
}
}
//send message to message hub.
cm.context.Send(common.HubModuleName, msg)
}
Expand Down Expand Up @@ -152,7 +157,7 @@ func (cm *CommModule) dealMessageTimeout() {
timeStamp := msg.GetTimestamp()/1e3
now := time.Now().UnixNano() / 1e9
if now - timeStamp >= types.DGTWINS_MSG_TIMEOUT {
if strings.Compare(common.DeviceName, target) == 0 {
if strings.Contains(target, common.DeviceName) {
if strings.Compare(common.DGTWINS_OPS_RESPONSE, operation) != 0 {
//mark device status is offline.
//send package and tell twin module, device is offline.
Expand All @@ -173,26 +178,32 @@ func (cm *CommModule) dealMessageTimeout() {
cm.context.MessageCache.Delete(key)
return true
}else{
if strings.Compare(common.DeviceName, target) == 0 &&
strings.Compare(common.DGTWINS_OPS_SYNC, operation) == 0 {
if strings.Contains(target, common.DeviceName) &&
strings.Compare(common.DGTWINS_OPS_DETECT, operation) == 0 {
// this is a ping message to device, then, we delete this mark
// and make this state as offline.
twinID := common.GetTwinID(msg)
dgtwin := &common.DeviceTwin{
ID: twinID,
State: common.DGTWINS_STATE_OFFLINE,
}
twinState := cm.context.GetTwinState(twinID)
if twinState != common.DGTWINS_STATE_CREATED &&
twinState != common.DGTWINS_STATE_OFFLINE {

dgtwin := &common.DeviceTwin{
ID: twinID,
State: common.DGTWINS_STATE_OFFLINE,
}

msgContent, err := common.BuildDeviceMessage(dgtwin)
if err == nil {
modelMsg := common.BuildModelMessage(types.MODULE_NAME, types.MODULE_NAME, common.DGTWINS_OPS_UPDATE,
msgContent, err := common.BuildDeviceMessage(dgtwin)
if err == nil {
modelMsg := common.BuildModelMessage(types.MODULE_NAME, types.MODULE_NAME, common.DGTWINS_OPS_UPDATE,
common.DGTWINS_RESOURCE_TWINS, msgContent)
cm.context.SendToModule(types.DGTWINS_MODULE_TWINS, modelMsg)
cm.context.SendToModule(types.DGTWINS_MODULE_TWINS, modelMsg)
}
}

cm.context.MessageCache.Delete(key)
klog.Infof("### Detect Device(%s) is offline", twinID)
}else {
//resend this message.
klog.Infof("### Resend this message...")
cm.context.SendToModule(types.DGTWINS_MODULE_COMM, msg)
}
return true
Expand Down
50 changes: 29 additions & 21 deletions dgtwin/dtmodule/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (dm *TwinModule) InitModule(dtc *dtcontext.DTContext, comm, heartBeat, conf

//Start Device module
func (dm *TwinModule) Start(){
KeepaliveCh := time.After(120 *time.Second)
//Start loop.
for {
select {
Expand Down Expand Up @@ -91,9 +92,11 @@ func (dm *TwinModule) Start(){
klog.Infof("%s module stopped", dm.Name())
return
}
case <-time.After(120*time.Second):
case <-KeepaliveCh:
//Check & sync device's state.
klog.Infof("####### ping device #############")
dm.PingDevice()
KeepaliveCh = time.After(120 *time.Second)
}
}
}
Expand All @@ -120,7 +123,6 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error)
return nil, err
}

twins := make([]common.DeviceTwin, 0)
//get all requested twins
for key, _ := range twinMsg.Twins {
twin := &twinMsg.Twins[key]
Expand All @@ -130,7 +132,7 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error)
if !exist {
dgTwin := &common.DigitalTwin{
ID: twinID,
State: common.DGTWINS_STATE_OFFLINE,
State: common.DGTWINS_STATE_CREATED,
}

//Create DGTwin is always success since it just create data startuctre
Expand All @@ -140,19 +142,21 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error)
var deviceMutex sync.Mutex
dm.context.DGTwinMutex.Store(twinID, &deviceMutex)
//save to sqlite, implement in future.
//TODO:

twins = append(twins, common.DeviceTwin{ID: twinID})
//TODO:

//detect the physical device
// send broadcast to all device, and wait (own this ID) device's response,
// if it has reply, then will report all property of this device.
dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, twin)
deviceTwin := &common.DeviceTwin{
ID: twinID,
State: common.DGTWINS_STATE_CREATED,
}
dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, deviceTwin)
}
}

//Send response.
msgContent, err := common.BuildResponseMessage(common.RequestSuccessCode, "Success", twins)
msgContent, err := common.BuildResponseMessage(common.RequestSuccessCode, "Success", twinMsg.Twins)
if err != nil {
//Internal err
return nil, err
Expand Down Expand Up @@ -199,10 +203,17 @@ func (dm *TwinModule) deviceUpdateHandle(msg *model.Message ) (interface{}, erro
dm.context.Unlock(twinID)

if err == nil {
klog.Infof("######### (%s) is online ##########", twinID)
klog.Infof("######### (%s) is %s ##########", twinID, oldTwin.State)
klog.Infof("######### Device information update successful ##########")

//notify others about device is online
twins := []common.DigitalTwin{*oldTwin}
msgContent, err := common.BuildTwinMessage(twins)
if err != nil {
return nil, err
}
//Send the Sync message.
dm.context.SendSyncMessage(common.CloudName, common.DGTWINS_RESOURCE_TWINS, msgContent)
} else {
//Internel err!
}
Expand Down Expand Up @@ -320,7 +331,8 @@ func (dm *TwinModule) deviceDeleteHandle(msg *model.Message) (interface{}, error
}

//notify the device delete link with dgtwin.
dm.context.SendMessage2Device(common.DGTWINS_OPS_DELETE, dgTwin)
devTwin := &common.DeviceTwin{ID: twinID}
dm.context.SendMessage2Device(common.DGTWINS_OPS_DELETE, devTwin)
dm.context.SendResponseMessage(msg, msgContent)
}

Expand All @@ -332,7 +344,7 @@ func (dm *TwinModule) deviceDeleteHandle(msg *model.Message) (interface{}, error
// If request twin is not exit, this func will return empty list.
func (dm *TwinModule) deviceGetHandle(msg *model.Message) (interface{}, error) {
var twinMsg common.TwinMessage
twins := make([]common.DeviceTwin, 0)
twins := make([]common.DigitalTwin, 0)

content, ok := msg.Content.([]byte)
if !ok {
Expand All @@ -356,10 +368,7 @@ func (dm *TwinModule) deviceGetHandle(msg *model.Message) (interface{}, error) {
return nil, errors.New("invalud digital twin type")
}

//convert digital twin to device twin.
deviceTwin := dm.Digital2Device(savedTwin)

twins = append(twins, *deviceTwin)
twins = append(twins, *savedTwin)
}else {
// not exist, ignore.
}
Expand Down Expand Up @@ -404,6 +413,7 @@ func (dm *TwinModule) deviceResponseHandle(msg *model.Message) (interface{}, err

klog.Infof("Device is online, update device with (%v)", deviceMsg)
dm.context.SendToModule(types.DGTWINS_MODULE_COMM, deviceMsg)

case common.DeviceNotReady:
}

Expand All @@ -425,14 +435,12 @@ func (dm *TwinModule) deviceResponseHandle(msg *model.Message) (interface{}, err
func (dm *TwinModule) PingDevice() {
dm.context.DGTwinList.Range(func(key, value interface{}) bool {
twinID := key.(string)

msgContent, err := common.BuildDeviceMessage(&common.DeviceTwin{ID: twinID})
if err == nil {
modelMsg := dm.context.BuildModelMessage(types.MODULE_NAME, common.DeviceName,
common.DGTWINS_OPS_SYNC, common.DGTWINS_RESOURCE_DEVICE, msgContent)
dm.context.SendToModule(types.DGTWINS_MODULE_COMM, modelMsg)
twin := &common.DeviceTwin{
ID: twinID,
State: dm.context.GetTwinState(twinID),
}

dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, twin)
return true
})
}
Expand Down
Loading

0 comments on commit a6b9a6c

Please sign in to comment.