Compare commits
No commits in common. "542935cb8fc42747060eea645fcd24b8d12a789d" and "69f04c09a65e30f78f6d453df731241432a2f249" have entirely different histories.
542935cb8f
...
69f04c09a6
|
|
@ -237,32 +237,12 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
||||||
con_id := uuid.New().String()
|
con_id := uuid.New().String()
|
||||||
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
|
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
|
||||||
//加入设备在线连接集合
|
//加入设备在线连接集合
|
||||||
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
|
worker.SetRedisSetAdd(online_conn_key, con_id)
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
defer ws.Close()
|
defer ws.Close()
|
||||||
ch := pubsub.Channel()
|
ch := pubsub.Channel()
|
||||||
var check_cnt int
|
var check_cnt int
|
||||||
for {
|
for msg := range ch {
|
||||||
select {
|
|
||||||
case msg, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
// 通道关闭或没有消息,每秒尝试检测连接
|
|
||||||
for {
|
|
||||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
|
|
||||||
if err != nil {
|
|
||||||
clientsMux.Lock()
|
|
||||||
clients[ws] = false
|
|
||||||
clientsMux.Unlock()
|
|
||||||
// 查看是否还有其他连接,没有则设置 is_play 为 0
|
|
||||||
if worker.IsContainKey(online_conn_key) == false {
|
|
||||||
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
|
||||||
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var res3 []byte
|
var res3 []byte
|
||||||
var msgObj proto.Message
|
var msgObj proto.Message
|
||||||
if msg.Payload != "" {
|
if msg.Payload != "" {
|
||||||
|
|
@ -290,9 +270,12 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
||||||
clientsMux.Unlock()
|
clientsMux.Unlock()
|
||||||
fmt.Println("send message to client err:", err2)
|
fmt.Println("send message to client err:", err2)
|
||||||
worker.SetRedisSetRemove(online_conn_key, con_id)
|
worker.SetRedisSetRemove(online_conn_key, con_id)
|
||||||
return
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//查看是否还有其他连接,没有则设置is_play为0
|
||||||
|
if worker.IsContainKey(online_conn_key) == false {
|
||||||
|
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
||||||
|
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -292,22 +292,6 @@ func SetRedisSetAdd(key string, value string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置set,添加元素
|
|
||||||
func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool {
|
|
||||||
ctx := context.Background()
|
|
||||||
err := RedisClient.SAdd(ctx, key, value).Err()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error setting key: %v", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
err = RedisClient.Expire(ctx, key, expire).Err()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error setting key: %v", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// 设置set,删除元素
|
// 设置set,删除元素
|
||||||
func SetRedisSetRemove(key string, value string) bool {
|
func SetRedisSetRemove(key string, value string) bool {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue