Jelajahi Sumber

新增:摄像头通道状态订阅主题数据

Huangzf 1 tahun lalu
induk
melakukan
881bc71d5c

+ 45 - 0
cq-fire/src/main/java/org/jetlinks/pro/cqfire/listener/MediaChannelEntityListener.java

@@ -0,0 +1,45 @@
+package org.jetlinks.pro.cqfire.listener;
+
+
+import org.hswebframework.web.crud.events.EntityBeforeModifyEvent;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.pro.media.entity.MediaChannelEntity;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.List;
+
+@Component
+public class MediaChannelEntityListener {
+
+    @Autowired
+    EventBus eventBus;
+
+    @EventListener
+    public void handleEntityBeforeModifyEvent(EntityBeforeModifyEvent<MediaChannelEntity> event) {
+        if (!CollectionUtils.isEmpty(event.getAfter())) {
+            event.async(
+                publishMediaChannelStatus(event.getAfter())
+            );
+        }
+    }
+
+    public Mono<Void> publishMediaChannelStatus(List<MediaChannelEntity> list){
+        return Flux.fromIterable(list)
+            .flatMap(mediaChannelEntity -> eventBus.publish("/media-channel-status",
+                new HashMap<String, String>() {{
+                    put("deviceId",mediaChannelEntity.getDeviceId());
+                    put("channelId", mediaChannelEntity.getChannelId());
+                    put("status", mediaChannelEntity.getStatus().getValue());
+                }}
+            )).then();
+    }
+
+
+
+}

+ 46 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/pro/device/message/MediaChannelStatusSubscriptionProvider.java

@@ -0,0 +1,46 @@
+package org.jetlinks.pro.device.message;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.core.Payload;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.pro.gateway.external.SubscribeRequest;
+import org.jetlinks.pro.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+
+@Component
+@AllArgsConstructor
+public class MediaChannelStatusSubscriptionProvider implements SubscriptionProvider {
+
+
+    private final EventBus eventBus;
+
+    @Override
+    public String id() {
+        return "media-channel-status";
+    }
+
+    @Override
+    public String name() {
+        return "推送摄像头设备通道状态";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/media-channel-status"
+        };
+    }
+
+    @Override
+    public Flux<?> subscribe(SubscribeRequest request) {
+        return eventBus.subscribe(
+            Subscription.builder()
+                .randomSubscriberId()
+                .topics("/media-channel-status")
+                .features(Subscription.Feature.local, Subscription.Feature.broker).build())
+            .map(Payload::bodyToJson);
+    }
+}