Explorar el Código

fix: 增加释放ssrc的逻辑

luoyuhong hace 3 meses
padre
commit
1bd7bf1a3a

+ 68 - 11
expands-components/jetlinks-media/src/main/java/org/jetlinks/pro/media/gb28181/RedisSsrcPool.java

@@ -1,14 +1,14 @@
 package org.jetlinks.pro.media.gb28181;
 
+import java.util.ArrayList;
+import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
 import org.springframework.data.redis.core.ReactiveRedisOperations;
 import org.springframework.data.redis.core.ReactiveRedisTemplate;
 import org.springframework.data.redis.serializer.RedisSerializationContext;
 import reactor.core.publisher.Mono;
-
-import java.util.ArrayList;
-import java.util.List;
+import reactor.core.scheduler.Schedulers;
 
 @Slf4j
 public class RedisSsrcPool implements SsrcPool {
@@ -63,17 +63,74 @@ public class RedisSsrcPool implements SsrcPool {
     }
 
     public Mono<Void> release(String ssrc) {
-        return redis
-            .opsForSet()
-            .add(redisKey, ssrc.substring(prefix.length() + 1))
-            .then();
+    String ssrcValue = ssrc.substring(prefix.length() + 1);
+    return redis
+        .opsForSet()
+        .add(redisKey, ssrcValue)
+        .publishOn(Schedulers.boundedElastic())
+        .doOnSuccess(
+            result -> {
+              if (result > 0) {
+                log.info(
+                    "ssrc release success: SSRC {} released and added back to the pool. Redis Key: {}).",
+                    ssrcValue,
+                    redisKey);
+              } else {
+                log.error(
+                    "ssrc release failed: Failed to release SSRC {}. Redis Key: {}",
+                    ssrcValue,
+                    redisKey);
+              }
+              redis
+                  .opsForSet()
+                  .isMember(redisKey, ssrcValue)
+                  .doOnNext(
+                      isMember -> {
+                        if (isMember) {
+                          log.warn(
+                              "SSRC {} is already present in Redis Set for key {}.",
+                              ssrcValue,
+                              redisKey);
+                        } else {
+                          log.error(
+                              "SSRC {} is not present in Redis Set for key {} but failed to add.",
+                              ssrcValue,
+                              redisKey);
+                        }
+                      })
+                  .doOnError(
+                      err ->
+                          log.error(
+                              "Error checking SSRC presence in Redis Set for key {}: {}",
+                              redisKey,
+                              err.getMessage()))
+                  .subscribe();
+            })
+        .doOnError(err -> log.error("Error during SSRC add operation", err))
+        .then();
     }
 
     private Mono<String> pollSsrc() {
-        return redis
-            .opsForSet()
-            .pop(redisKey)
-            .switchIfEmpty(Mono.error(() -> new IllegalStateException("ssrc pool is empty")));
+    return redis
+        .opsForSet()
+        .pop(redisKey)
+        .publishOn(Schedulers.boundedElastic())
+        .doOnSuccess(
+            ssrc -> {
+              if (ssrc != null) {
+                log.info("Popped SSRC: {}, key is : {}", ssrc, redisKey);
+              } else {
+                log.error("Failed to pop SSRC: Pool is empty, key is: {}", redisKey);
+              }
+            })
+        .doOnError(
+            err ->
+                log.error(
+                    "Error during SSRC pop operation for key: {}, error: {}",
+                    redisKey,
+                    err.getMessage(),
+                    err))
+        .switchIfEmpty(Mono.error(() -> new IllegalStateException("ssrc pool is empty")));
     }
 
 }

+ 38 - 19
expands-components/jetlinks-media/src/main/java/org/jetlinks/pro/media/server/DefaultStreamManager.java

@@ -1,12 +1,14 @@
 package org.jetlinks.pro.media.server;
 
+import java.time.Duration;
+import java.util.*;
+import javax.annotation.Nonnull;
 import lombok.AllArgsConstructor;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
 import org.hswebframework.web.crud.events.EntityModifyEvent;
 import org.hswebframework.web.crud.events.EntitySavedEvent;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.pro.media.MediaConstants;
 import org.jetlinks.pro.media.entity.MediaServerEntity;
 import org.jetlinks.pro.media.entity.MediaStreamEntity;
 import org.jetlinks.pro.media.event.StreamStateEvent;
@@ -16,13 +18,6 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import javax.annotation.Nonnull;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 @Component
 @AllArgsConstructor
 public class DefaultStreamManager implements StreamManager {
@@ -171,17 +166,41 @@ public class DefaultStreamManager implements StreamManager {
             .map(MediaStreamEntity::toDeviceStreamInfo);
     }
 
-    @Override
-    public Mono<Integer> removeNoWatchingStream(String serverId, @Nonnull Duration time, @Nonnull String... streamType) {
-        return repository
-            .createDelete()
-            .where(MediaStreamEntity::getMediaServerId, serverId)
-            .lte(MediaStreamEntity::getPlayers, 0)
-            .lt(MediaStreamEntity::getLastPlayStateTime, System.currentTimeMillis() - time.toMillis())
-            .when(streamType.length > 0, q -> q.in(MediaStreamEntity::getType, Arrays.asList(streamType)))
-            .execute();
-    }
-
+  @Override
+  public Flux<String> removeNoWatchingStream(
+      String serverId, @Nonnull Duration time, @Nonnull String... streamType) {
+    return repository
+        .createQuery()
+        .where(MediaStreamEntity::getMediaServerId, serverId)
+        .lte(MediaStreamEntity::getPlayers, 0) // 没有观众
+        .lt(
+            MediaStreamEntity::getLastPlayStateTime,
+            System.currentTimeMillis() - time.toMillis()) // 超过指定时长无人观看
+        .when(
+            streamType.length > 0,
+            q -> q.in(MediaStreamEntity::getType, Arrays.asList(streamType))) // 根据流类型过滤
+        .fetch() // 获取流记录
+        .flatMap(
+            stream -> {
+              // 提取 streamInfo
+              StreamInfo streamInfo = stream.getStreamInfo();
+              String ssrc = streamInfo != null ? streamInfo.getSsrc() : null;
+
+              // 执行删除操作
+              return repository
+                  .createDelete()
+                  .where(MediaStreamEntity::getId, stream.getId()) // 删除对应的实体
+                  .execute()
+                  .thenReturn(Objects.requireNonNull(ssrc)); // 删除后返回 ssrc
+            });
+    /*        return repository
+    .createDelete()
+    .where(MediaStreamEntity::getMediaServerId, serverId)
+    .lte(MediaStreamEntity::getPlayers, 0)
+    .lt(MediaStreamEntity::getLastPlayStateTime, System.currentTimeMillis() - time.toMillis())
+    .when(streamType.length > 0, q -> q.in(MediaStreamEntity::getType, Arrays.asList(streamType)))
+    .execute();*/
+  }
 
     /**
      * 监听流媒体服务配置变化。移除对应 MediaStreamEntity。播放时会生成新的流信息。

+ 15 - 17
expands-components/jetlinks-media/src/main/java/org/jetlinks/pro/media/server/StreamManager.java

@@ -1,13 +1,12 @@
 package org.jetlinks.pro.media.server;
 
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * 流管理器
@@ -98,18 +97,17 @@ public interface StreamManager {
      */
     Flux<DeviceStreamInfo> findNoWatchingStream(String serverId, Duration time, @Nonnull String... streamType);
 
-    /**
-     * 删除无人观看的流信息
-     *
-     * @param serverId    服务ID
-     * @param time        时间
-     * @param streamTypes 流类型 {@link StreamInfo#TYPE_LIVE}
-     * @return 删除的流数量
-     * @see StreamInfo
-     */
-    Mono<Integer> removeNoWatchingStream(@Nullable String serverId,
-                                         @Nonnull Duration time,
-                                         @Nonnull String... streamTypes);
+  /**
+   * 删除无人观看的流信息
+   *
+   * @param serverId 服务ID
+   * @param time 时间
+   * @param streamTypes 流类型 {@link StreamInfo#TYPE_LIVE}
+   * @return 删除的流数量
+   * @see StreamInfo
+   */
+  Flux<String> removeNoWatchingStream(
+      @Nullable String serverId, @Nonnull Duration time, @Nonnull String... streamTypes);
 
     /**
      * 删除流信息

+ 79 - 30
expands-components/jetlinks-media/src/main/java/org/jetlinks/pro/media/service/MediaGatewayService.java

@@ -1,8 +1,14 @@
 package org.jetlinks.pro.media.service;
 
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.events.EntityDeletedEvent;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.core.event.EventBus;
@@ -16,25 +22,23 @@ import org.jetlinks.pro.media.entity.MediaGatewayEntity;
 import org.jetlinks.pro.media.entity.MediaStreamEntity;
 import org.jetlinks.pro.media.enums.ChannelSyncState;
 import org.jetlinks.pro.media.enums.GatewayStatus;
-import org.jetlinks.pro.media.gb28181.cascade.CascadeStreamInfo;
+import org.jetlinks.pro.media.gb28181.RedisSsrcPool;
+import org.jetlinks.pro.media.gb28181.SsrcPool;
 import org.jetlinks.pro.media.server.DeviceStreamInfo;
 import org.jetlinks.pro.media.server.StreamInfo;
 import org.jetlinks.pro.media.server.StreamManager;
+import org.jetlinks.pro.media.sip.SipProperties;
 import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.context.event.EventListener;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import javax.annotation.PreDestroy;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 @Service
 @Slf4j
 public class MediaGatewayService extends GenericReactiveCrudService<MediaGatewayEntity, String> implements CommandLineRunner {
@@ -52,14 +56,21 @@ public class MediaGatewayService extends GenericReactiveCrudService<MediaGateway
 
     private MediaChannelService channelService;
 
-    public MediaGatewayService(EventBus eventBus,
-                               StreamManager streamManager,
-                               ObjectProvider<MediaGatewayProvider> providers,
-                               ObjectProvider<MediaGateway> fixedGateway,
-                               MediaChannelService channelService) {
+  private final ReactiveRedisConnectionFactory connectionFactory;
+
+  private SsrcPool ssrcPool;
+
+  public MediaGatewayService(
+      EventBus eventBus,
+      StreamManager streamManager,
+      ObjectProvider<MediaGatewayProvider> providers,
+      ObjectProvider<MediaGateway> fixedGateway,
+      MediaChannelService channelService,
+      ReactiveRedisConnectionFactory connectionFactory) {
         this.eventBus = eventBus;
         this.streamManager = streamManager;
         this.channelService = channelService;
+    this.connectionFactory = connectionFactory;
         for (MediaGatewayProvider provider : providers) {
             gatewayProviders.put(provider.getId(), provider);
         }
@@ -68,6 +79,21 @@ public class MediaGatewayService extends GenericReactiveCrudService<MediaGateway
         }
     }
 
+  @PostConstruct
+  public void initSsrcPool() {
+    // fixme
+    this.getGatewayByServerId("gb28181_MediaServer")
+        .flatMap(
+            config -> {
+              SipProperties sipProperties =
+                  FastBeanCopier.copy(config.getConfiguration(), new SipProperties());
+              this.ssrcPool =
+                  new RedisSsrcPool(connectionFactory, sipProperties.getSipId().substring(3, 8));
+              return Mono.empty();
+            })
+        .subscribe();
+  }
+
     @Override
     public Mono<SaveResult> save(Publisher<MediaGatewayEntity> entityPublisher) {
         return Flux.from(entityPublisher)
@@ -172,15 +198,18 @@ public class MediaGatewayService extends GenericReactiveCrudService<MediaGateway
         return Mono.justOrEmpty(gateways.get(id));
     }
 
-    //监听来自其他集群的视频流关闭事件
-    @Subscribe(topics = "/media/*/*/*/closed", features = Subscription.Feature.broker)
-    public Mono<Void> closeStream(DeviceStreamInfo stream) {
-        return this
-            .getGateway(stream.getGatewayId())
-            .flatMap(gateway -> gateway
-                         .closeStream(stream)
-                         .then()
-//                .flatMap(success -> streamManager.removeStream(stream.getDeviceId(), stream.getStreamId()))
+  public Mono<MediaGatewayEntity> getGatewayByServerId(String serverId) {
+    return createQuery().where(MediaGatewayEntity::getMediaServerId, serverId).fetchOne(); // 返回单个结果
+  }
+
+  // 监听来自其他集群的视频流关闭事件
+  @Subscribe(topics = "/media/*/*/*/closed", features = Subscription.Feature.broker)
+  public Mono<Void> closeStream(DeviceStreamInfo stream) {
+    return this.getGateway(stream.getGatewayId())
+        .flatMap(
+            gateway -> gateway.closeStream(stream).then()
+            //                .flatMap(success -> streamManager.removeStream(stream.getDeviceId(),
+            // stream.getStreamId()))
             );
     }
 
@@ -223,16 +252,36 @@ public class MediaGatewayService extends GenericReactiveCrudService<MediaGateway
     @Override
     public void run(String... args) {
 
-        if (autoCloseNoWatching) {
-            intervalJob = Flux
-                .interval(Duration.ofMinutes(1))
-                .flatMap(ignore -> streamManager
-                    //只定时删除直播,回放
-                    .removeNoWatchingStream(null, Duration.ofMinutes(5), StreamInfo.TYPE_LIVE, StreamInfo.TYPE_PLAYBACK)
-                    .onErrorResume(err -> Mono.empty())
-                    .then())
-                .subscribe();
-        }
+    if (autoCloseNoWatching) {
+      intervalJob =
+          Flux.interval(Duration.ofMinutes(1))
+              .flatMap(
+                  ignore ->
+                      streamManager
+                          // 只定时删除直播,回放
+                          .removeNoWatchingStream(
+                              null,
+                              Duration.ofMinutes(5),
+                              StreamInfo.TYPE_LIVE,
+                              StreamInfo.TYPE_PLAYBACK)
+                          .flatMap(
+                              ssrc -> {
+                                if (ssrc == null || ssrc.isEmpty()) {
+                                  return Mono.empty();
+                                }
+
+                                return ssrcPool
+                                    .release(ssrc)
+                                    .onErrorResume(
+                                        err -> {
+                                          log.error("Error releasing ssrc: " + ssrc, err);
+                                          return Mono.empty();
+                                        });
+                              })
+                          .onErrorResume(err -> Mono.empty())
+                          .then())
+              .subscribe();
+    }
 
         createQuery()
             .where(MediaGatewayEntity::getStatus, GatewayStatus.enabled)