package device import ( "encoding/base64" "encoding/json" "errors" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" log "github.com/sirupsen/logrus" "github.com/smbrave/goutil" "github.com/spf13/cast" "github.com/tidwall/gjson" "strings" "sync" "time" ) var ( VzenithTypeRaw = "raw" VzenithTypeIvsResult = "ivs_result" VzenithTypeSerialData = "serial_data" ) type VzenithR5 struct { sync.Mutex brand string model string deviceId string client mqtt.Client callback Callback } func NewVzenithR5(deviceId string, client mqtt.Client, call Callback) *VzenithR5 { u := &VzenithR5{ brand: BrandVzenith, model: "r5", deviceId: deviceId, client: client, callback: call, } if call != nil { var token mqtt.Token = nil if deviceId == "#" { token = client.Subscribe(fmt.Sprintf("%s/%s/%s", u.brand, u.model, deviceId), 2, u.Callback) } else { token = client.Subscribe(fmt.Sprintf("%s/%s/%s/#", u.brand, u.model, deviceId), 2, u.Callback) } if err := token.Error(); err != nil { log.Errorf("brand[%s] model[%s] device[%s] Subscribe error:%s", u.brand, u.model, deviceId, err.Error()) } } return u } func (u *VzenithR5) IvsTrigger() error { if u.deviceId == "#" { return errors.New("no deviceId") } params := make(map[string]interface{}) params["id"] = cast.ToString(goutil.GetBigID(0, 0)) params["sn"] = u.deviceId params["name"] = "ivs_trigger" params["version"] = "1.0" params["timestamp"] = time.Now().Unix() params["payload"] = map[string]interface{}{ "type": "ivs_trigger", "body": map[string]interface{}{}, } payload := goutil.EncodeJSON(params) topic := fmt.Sprintf("%s/%s/%s/down/ivs_trigger", u.brand, u.model, u.deviceId) publishACK := u.client.Publish(topic, 2, false, payload) publishACK.WaitTimeout(time.Second) return publishACK.Error() } func (u *VzenithR5) KeepAlive() error { params := make(map[string]interface{}) params["id"] = cast.ToString(goutil.GetBigID(0, 0)) params["sn"] = u.deviceId params["name"] = "get_device_timestamp" params["version"] = "1.0" params["timestamp"] = time.Now().Unix() params["payload"] = map[string]interface{}{ "type": "get_device_timestamp", "body": map[string]interface{}{}, } payload := goutil.EncodeJSON(params) topic := fmt.Sprintf("%s/%s/%s/down/get_device_timestamp", u.brand, u.model, u.deviceId) publishACK := u.client.Publish(topic, 2, false, payload) publishACK.WaitTimeout(time.Second) return publishACK.Error() } func (u *VzenithR5) Operate(string) error { return nil } func (u *VzenithR5) SendSerialData(channel int, data []byte) error { dataBase64 := base64.StdEncoding.EncodeToString(data) dataLen := len(dataBase64) params := make(map[string]interface{}) params["id"] = cast.ToString(goutil.GetBigID(0, 0)) params["sn"] = u.deviceId params["name"] = "serial_data" params["version"] = "1.0" params["timestamp"] = time.Now().Unix() params["payload"] = map[string]interface{}{ "type": "serial_data", "body": map[string]interface{}{ "serialData": []map[string]interface{}{ { "serialChannel": channel, "data": dataBase64, "dataLen": dataLen, }, }, }, } payload := goutil.EncodeJSON(params) topic := fmt.Sprintf("%s/%s/%s/down/serial_data", u.brand, u.model, u.deviceId) publishACK := u.client.Publish(topic, 2, false, payload) publishACK.WaitTimeout(time.Second) return publishACK.Error() } func (u *VzenithR5) getDeviceId(topic string) string { devId := u.deviceId if devId == "#" { fields := strings.Split(topic, "/") if len(fields) >= 3 { devId = fields[2] } } return devId } func (s *VzenithR5) Callback(client mqtt.Client, message mqtt.Message) { s.Lock() defer s.Unlock() topic := message.Topic() payload := message.Payload() log.Infof("[vzenith][R5] deviceId[%s] topic[%s] payloyad[%s]", s.deviceId, topic, string(payload)) if strings.HasSuffix(topic, "ivs_result") { s.parseIvsResult(cast.ToString(message.MessageID()), topic, payload) return } if strings.HasSuffix(topic, "up/serial_data") { s.parseSerialData(cast.ToString(message.MessageID()), topic, payload) return } if strings.HasSuffix(topic, "down/get_device_timestamp") { return } var callMsg Message callMsg.MsgType = VzenithTypeRaw callMsg.MsgTime = time.Now().Unix() callMsg.MsgId = cast.ToString(message.MessageID()) callMsg.DeviceId = s.getDeviceId(topic) callMsg.Topic = topic callMsg.Data = make(map[string]interface{}) json.Unmarshal([]byte(payload), &callMsg.Data) s.callback(&callMsg) } func (s *VzenithR5) parseSerialData(msgId, topic string, payload []byte) { mp := make(map[string]interface{}) if err := json.Unmarshal(payload, &mp); err != nil { log.Errorf("paylooad[%s] error :%s", string(payload), err.Error()) return } g := gjson.Parse(string(payload)) message := &Message{} message.DeviceId = s.getDeviceId(topic) message.Topic = topic message.MsgType = VzenithTypeSerialData message.MsgTime = time.Now().Unix() message.MsgTime = cast.ToInt64(mp["timestamp"]) message.MsgId = msgId data := g.Get("payload.body.SerialData.data") channel := g.Get("payload.body.SerialData.serialChannel").Int() d, _ := base64.StdEncoding.DecodeString(data.String()) message.Data = make(map[string]interface{}) message.Data["data"] = string(d) message.Data["channel"] = channel s.callback(message) } func (s *VzenithR5) parseIvsResult(msgId, topic string, payload []byte) { mp := make(map[string]interface{}) if err := json.Unmarshal(payload, &mp); err != nil { log.Errorf("paylooad[%s] error :%s", string(payload), err.Error()) return } g := gjson.Parse(string(payload)) message := &Message{} message.DeviceId = s.getDeviceId(topic) message.Topic = topic message.MsgType = VzenithTypeIvsResult message.MsgTime = time.Now().Unix() message.MsgTime = cast.ToInt64(mp["timestamp"]) message.MsgId = msgId license := g.Get("payload.AlarmInfoPlate.result.PlateResult.license") lic, _ := base64.StdEncoding.DecodeString(license.String()) message.Data = make(map[string]interface{}) message.Data["license"] = string(lic) s.callback(message) }