Bläddra i källkod

fix: 增加日志调

luoyuhong 3 månader sedan
förälder
incheckning
41acc66e61

+ 74 - 66
expands-components/jetlinks-media/src/main/java/org/jetlinks/pro/media/gb28181/DefaultGB28181MediaDeviceOperations.java

@@ -6,6 +6,22 @@ import gov.nist.javax.sdp.MediaDescriptionImpl;
 import gov.nist.javax.sdp.fields.MediaField;
 import gov.nist.javax.sdp.parser.MediaFieldParser;
 import gov.nist.javax.sip.stack.SIPDialog;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.sdp.MediaDescription;
+import javax.sdp.SdpFactory;
+import javax.sdp.TimeDescription;
+import javax.sip.ClientTransaction;
+import javax.sip.Transaction;
+import javax.sip.header.CSeqHeader;
+import javax.sip.message.Request;
+import javax.sip.message.Response;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
@@ -46,23 +62,6 @@ import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.sdp.MediaDescription;
-import javax.sdp.SdpFactory;
-import javax.sdp.TimeDescription;
-import javax.sip.ClientTransaction;
-import javax.sip.Transaction;
-import javax.sip.header.CSeqHeader;
-import javax.sip.message.Request;
-import javax.sip.message.Response;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-
 public class DefaultGB28181MediaDeviceOperations implements GB28181MediaDeviceOperations {
     private final String gatewayId;
     @Getter
@@ -260,57 +259,66 @@ public class DefaultGB28181MediaDeviceOperations implements GB28181MediaDeviceOp
                                                                                           String type,
                                                                                           Function4<String, String, Integer, StreamMode, byte[]> payloadBuilder,
                                                                                           boolean localPlayer) {
-        return ssrcPool
-            .generatePlaySsrc()
-            .flatMap(ssrc -> {
-                String streamId = DigestUtils.md5Hex(getDeviceId() + ":" + channelId + ":" + ssrc);
-                //获取流信息
-                return Mono
-                    .zip(
-                        mediaServer.getRtpPort(streamId),
-                        getDeviceStreamMode()
-                    )
-                    .flatMap(tp2 -> Mono
-                        .defer(() -> {
-                            byte[] payload = payloadBuilder.apply(ssrc, mediaServer.getRtpIp(), tp2.getT1(), tp2.getT2());
-                            return this
-                                .sendInvite(channelId, String.format("%s:%s,%s:%s", channelId, ssrc, properties.getSipId(), 0), payload)
-                                .flatMap(transAndSdp -> {
-                                    Transaction transaction = transAndSdp.getT1();
-                                    try {
-                                        SessionDescription sdp = transAndSdp.getT2();
-                                        validateSdp(mediaServer.getRtpIp(), tp2.getT1(), sdp);
-                                        String ssrcValue = (sdp.getSsrc() == null)
-                                            ? ssrc :
-                                            sdp.getSsrc().getValue();
-                                        return mediaServer
-                                            .getRtpStreamInfo(streamId, ssrcValue, localPlayer)
-                                            .doOnNext(stream -> {
-                                                stream.setSdp(sdp.toString());
-                                                stream.setSsrc(ssrcValue);
-                                                stream.setType(type);
-                                            })
-                                            .map(stream -> Tuples.of(stream, transaction, sdp));
-                                    }catch (Throwable e){
-                                        SerializeTransaction.bye(
-                                            sipLayer,
-                                            ((SIPDialog) transaction.getDialog()),
-                                            device,
-                                            properties,
-                                            cseqInc.getAndIncrement()
-                                        );
-                                      return Mono.error(e);
-                                    }
-                                })
-                                ;
-                        }))
-                    .timeout(Duration.ofSeconds(30), Mono.error(() -> new TimeoutException("request stream timeout")))
-                    .doFinally((signalType) -> {
+    return ssrcPool
+        .generatePlaySsrc()
+        .flatMap(
+            ssrc -> {
+              String streamId = DigestUtils.md5Hex(getDeviceId() + ":" + channelId + ":" + ssrc);
+              // 获取流信息
+              return Mono.zip(mediaServer.getRtpPort(streamId), getDeviceStreamMode())
+                  .flatMap(
+                      tp2 ->
+                          Mono.defer(
+                              () -> {
+                                byte[] payload =
+                                    payloadBuilder.apply(
+                                        ssrc, mediaServer.getRtpIp(), tp2.getT1(), tp2.getT2());
+                                return this.sendInvite(
+                                        channelId,
+                                        String.format(
+                                            "%s:%s,%s:%s",
+                                            channelId, ssrc, properties.getSipId(), 0),
+                                        payload)
+                                    .flatMap(
+                                        transAndSdp -> {
+                                          Transaction transaction = transAndSdp.getT1();
+                                          try {
+                                            SessionDescription sdp = transAndSdp.getT2();
+                                            validateSdp(mediaServer.getRtpIp(), tp2.getT1(), sdp);
+                                            String ssrcValue =
+                                                (sdp.getSsrc() == null)
+                                                    ? ssrc
+                                                    : sdp.getSsrc().getValue();
+                                            return mediaServer
+                                                .getRtpStreamInfo(streamId, ssrcValue, localPlayer)
+                                                .doOnNext(
+                                                    stream -> {
+                                                      stream.setSdp(sdp.toString());
+                                                      stream.setSsrc(ssrcValue);
+                                                      stream.setType(type);
+                                                    })
+                                                .map(stream -> Tuples.of(stream, transaction, sdp));
+                                          } catch (Throwable e) {
+                                            System.out.println("01234");
+                                            SerializeTransaction.bye(
+                                                sipLayer,
+                                                ((SIPDialog) transaction.getDialog()),
+                                                device,
+                                                properties,
+                                                cseqInc.getAndIncrement());
+                                            return Mono.error(e);
+                                          }
+                                        });
+                              }))
+                  .timeout(
+                      Duration.ofSeconds(30),
+                      Mono.error(() -> new TimeoutException("request stream timeout")))
+                  .doFinally(
+                      (signalType) -> {
                         if (signalType == SignalType.ON_ERROR || signalType == SignalType.CANCEL) {
-                            ssrcPool.release(ssrc)
-                                    .subscribe();
+                          ssrcPool.release(ssrc).subscribe();
                         }
-                    });
+                      });
             });
     }