|
@@ -0,0 +1,519 @@
|
|
|
+package org.jetlinks.pro.cqfire.service.report;
|
|
|
+
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.hswebframework.ezorm.core.param.TermType;
|
|
|
+import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
|
|
+import org.hswebframework.web.api.crud.entity.TransactionManagers;
|
|
|
+import org.hswebframework.web.crud.events.EntityCreatedEvent;
|
|
|
+import org.hswebframework.web.crud.events.EntityDeletedEvent;
|
|
|
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
+import org.hswebframework.web.exception.BusinessException;
|
|
|
+import org.hswebframework.web.id.IDGenerator;
|
|
|
+import org.jetlinks.core.event.EventBus;
|
|
|
+import org.jetlinks.core.event.Subscription;
|
|
|
+import org.jetlinks.pro.cqfire.entity.CorpEntity;
|
|
|
+import org.jetlinks.pro.cqfire.entity.ReportEntity;
|
|
|
+import org.jetlinks.pro.cqfire.entity.TestItemEntity;
|
|
|
+import org.jetlinks.pro.cqfire.enums.ReportTestState;
|
|
|
+import org.jetlinks.pro.cqfire.enums.TestState;
|
|
|
+import org.jetlinks.pro.cqfire.service.corp.CorpService;
|
|
|
+import org.jetlinks.pro.cqfire.service.item.TestItemService;
|
|
|
+import org.jetlinks.pro.cqfire.subscriber.DeviceTestProvider;
|
|
|
+import org.jetlinks.pro.cqfire.subscriber.DeviceTestRule;
|
|
|
+import org.jetlinks.pro.cqfire.subscriber.DeviceTestTaskExecutor;
|
|
|
+import org.jetlinks.pro.cqfire.web.report.ReportResponse;
|
|
|
+import org.jetlinks.pro.cqfire.web.report.TestItemDetail;
|
|
|
+import org.jetlinks.pro.device.entity.DeviceTestEntity;
|
|
|
+import org.jetlinks.pro.device.service.DeviceTestService;
|
|
|
+import org.jetlinks.pro.gateway.annotation.Subscribe;
|
|
|
+import org.reactivestreams.Publisher;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import reactor.core.Disposable;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author kyl
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class ReportService extends GenericReactiveCrudService<ReportEntity, String> implements CommandLineRunner {
|
|
|
+ private final CorpService corpService;
|
|
|
+ private final DeviceTestService deviceTestService;
|
|
|
+ private final TestItemService testItemService;
|
|
|
+
|
|
|
+ private final EventBus eventBus;
|
|
|
+ private final DeviceTestProvider deviceTestProvider;
|
|
|
+
|
|
|
+ private final Map<String, Disposable> subscribers = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ public ReportService(CorpService corpService,
|
|
|
+ DeviceTestService deviceTestService,
|
|
|
+ TestItemService testItemService, EventBus eventBus,
|
|
|
+ DeviceTestProvider deviceTestProvider) {
|
|
|
+ this.corpService = corpService;
|
|
|
+ this.deviceTestService = deviceTestService;
|
|
|
+ this.testItemService = testItemService;
|
|
|
+ this.eventBus = eventBus;
|
|
|
+ this.deviceTestProvider = deviceTestProvider;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(rollbackFor = Exception.class, transactionManager = TransactionManagers.reactiveTransactionManager)
|
|
|
+ public Mono<ReportEntity> createReport(ReportEntity reportEntity) {
|
|
|
+ String id = IDGenerator.SNOW_FLAKE_STRING.generate();
|
|
|
+ return testItemService
|
|
|
+ .queryByTestDeviceId(reportEntity.getDeviceId())
|
|
|
+ .switchIfEmpty(Mono.error(new BusinessException("先配置测试项!")))
|
|
|
+ .then(createQuery()
|
|
|
+ .where(ReportEntity::getDeviceId, reportEntity.getDeviceId())
|
|
|
+ .fetch()
|
|
|
+ .count()
|
|
|
+ .filter(i -> i < 1)
|
|
|
+ .switchIfEmpty(Mono.error(new BusinessException("已创建该设备测试报告!")))
|
|
|
+ .flatMap(res -> deviceTestService
|
|
|
+ .findById(reportEntity.getDeviceId())
|
|
|
+ .flatMap(entity -> {
|
|
|
+ reportEntity.setId(id);
|
|
|
+ reportEntity.setName(entity.getName() + "的报告");
|
|
|
+ reportEntity.setState(ReportTestState.test);
|
|
|
+ reportEntity.setCorpId(entity.getCorpId());
|
|
|
+ reportEntity.setEnable(false);
|
|
|
+ reportEntity.setDeviceName(entity.getName());
|
|
|
+ reportEntity.setPassedItems(0);
|
|
|
+ reportEntity.setCreateTime(System.currentTimeMillis());
|
|
|
+ return insert(reportEntity).thenReturn(reportEntity);
|
|
|
+ })));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Integer> changeReportState(String id, Boolean enable) {
|
|
|
+ return createUpdate()
|
|
|
+ .set(ReportEntity::getEnable, enable)
|
|
|
+ .where(ReportEntity::getId, id)
|
|
|
+ .execute();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<TestItemEntity> testReportComplete(String reportId, TestItemEntity testItemEntity) {
|
|
|
+ return Mono.just(testItemEntity)
|
|
|
+ .flatMap(entity -> testItemService
|
|
|
+ .updateById(entity.getId(), entity)
|
|
|
+ .map(res -> res > 0)
|
|
|
+ .filter(item -> entity.getState().equals(TestState.passed))
|
|
|
+ .switchIfEmpty(Mono.empty())
|
|
|
+ .flatMap(a -> findById(reportId)
|
|
|
+ .flatMap(reportEntity -> createUpdate()
|
|
|
+ .where(ReportEntity::getDeviceId, entity.getDeviceTestId())
|
|
|
+ .set(ReportEntity::getPassedItems, reportEntity.getPassedItems() + 1)
|
|
|
+ .execute())
|
|
|
+ ))
|
|
|
+ .then(testItemService
|
|
|
+ .createQuery()
|
|
|
+ .where(TestItemEntity::getDeviceTestId, testItemEntity.getDeviceTestId())
|
|
|
+ .fetch()
|
|
|
+ .filter(item -> !item.getState().equals(TestState.passed)
|
|
|
+ && item.getEnableTest().equals(true))
|
|
|
+ .count()
|
|
|
+ .filter(i -> i == 0)
|
|
|
+ .flatMap(res -> createUpdate()
|
|
|
+ .where(ReportEntity::getId, reportId)
|
|
|
+ .set(ReportEntity::getPassTime, System.currentTimeMillis())
|
|
|
+ .set(ReportEntity::getState, ReportTestState.qualified)
|
|
|
+ .execute()))
|
|
|
+ .thenReturn(testItemEntity);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<ReportResponse> queryReportById(String id) {
|
|
|
+ ReportResponse response = new ReportResponse();
|
|
|
+ return createQuery()
|
|
|
+ .where(ReportEntity::getId, id)
|
|
|
+ .fetchOne()
|
|
|
+ .map(ReportResponse::of)
|
|
|
+ .flatMap(report -> deviceTestService
|
|
|
+ .findById(report.getDeviceId())
|
|
|
+ .map(deviceInstanceEntity -> response.fromDeviceTestEntity(report, deviceInstanceEntity)))
|
|
|
+ .flatMap(res -> corpService
|
|
|
+ .findById(res.getCorpId())
|
|
|
+ .flatMap(corpEntity -> {
|
|
|
+ res.setCqAddress(corpEntity.getCqAddress());
|
|
|
+ res.setShortName(corpEntity.getShortName());
|
|
|
+ return addItems(res);
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<PagerResult<ReportResponse>> queryAllDetails(QueryParamEntity param) {
|
|
|
+ return this.queryPager(param)
|
|
|
+ .filter(e -> !e.getData().isEmpty())
|
|
|
+ .flatMap(result -> this.convertReportAllDetail(result.getData())
|
|
|
+ .collectList()
|
|
|
+ .map(this::convertReportByCorpId)
|
|
|
+ .flatMapMany(res -> res.sort(Comparator
|
|
|
+ .comparing(ReportResponse::getTestTime)
|
|
|
+ .reversed()))
|
|
|
+ .collectList()
|
|
|
+ .map(list -> PagerResult.of(result.getTotal(), list, param)))
|
|
|
+ .defaultIfEmpty(PagerResult.of(0, Collections.emptyList(), param));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Integer> deleteDeviceAndReport(Publisher<String> idPublisher) {
|
|
|
+ return Flux.from(idPublisher)
|
|
|
+ .collectList()
|
|
|
+ .flatMap(ids -> createDelete()
|
|
|
+ .where()
|
|
|
+ .in(ReportEntity::getDeviceId, ids)
|
|
|
+ .execute()
|
|
|
+ .then(deviceTestService
|
|
|
+ .createUpdate()
|
|
|
+ .where()
|
|
|
+ .in(DeviceTestEntity::getId, ids)
|
|
|
+ .set(DeviceTestEntity::getDeleteState, true)
|
|
|
+ .execute()));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Integer> deleteOneDeviceAndReport(String deviceId) {
|
|
|
+ return createDelete()
|
|
|
+ .where(ReportEntity::getDeviceId, deviceId)
|
|
|
+ .execute()
|
|
|
+ .then(deviceTestService
|
|
|
+ .createUpdate()
|
|
|
+ .where(DeviceTestEntity::getId, deviceId)
|
|
|
+ .set(DeviceTestEntity::getDeleteState, true)
|
|
|
+ .execute());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<ReportPdfInfo> convertPdfInfo(String deviceId, String corpId) {
|
|
|
+ return Mono
|
|
|
+ .zip(deviceTestService
|
|
|
+ .findById(deviceId),
|
|
|
+ corpService
|
|
|
+ .findById(corpId),
|
|
|
+ createQuery()
|
|
|
+ .where(ReportEntity::getDeviceId, deviceId)
|
|
|
+ .fetchOne(),
|
|
|
+ testItemService
|
|
|
+ .queryByTestDeviceId(deviceId)
|
|
|
+ .sort(Comparator.comparing(TestItemDetail::getId))
|
|
|
+ .collectList()
|
|
|
+ )
|
|
|
+ .map(tp4 -> ReportPdfInfo
|
|
|
+ .getExportDataContent(tp4.getT1(), tp4.getT2(), tp4.getT3(), tp4.getT4()));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<PagerResult<ReportResponse>> reportPublicDetail(QueryParamEntity paramEntity) {
|
|
|
+ paramEntity.and("enable", TermType.eq, true).and("state", TermType.eq, ReportTestState.qualified);
|
|
|
+
|
|
|
+ return queryPager(paramEntity)
|
|
|
+ .filter(res -> !CollectionUtils.isEmpty(res.getData()))
|
|
|
+ .flatMap(res -> this
|
|
|
+ .convertReportAllDetail(res.getData())
|
|
|
+ .collectList()
|
|
|
+ .map(this::convertReportByCorpId)
|
|
|
+ .flatMapMany(data -> data
|
|
|
+ .sort(Comparator
|
|
|
+ .comparing(ReportResponse::getTestTime)
|
|
|
+ .reversed()))
|
|
|
+ .collectList()
|
|
|
+ .map(list -> PagerResult.of(res.getTotal(), list, paramEntity)))
|
|
|
+ .defaultIfEmpty(PagerResult.empty());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<ReportResponse> convertReportAllDetail(List<ReportEntity> reports) {
|
|
|
+ Map<String, ReportResponse> groupMap = reports
|
|
|
+ .stream()
|
|
|
+ .collect(Collectors.toMap(ReportEntity::getDeviceId, ReportResponse::of));
|
|
|
+
|
|
|
+ return Mono
|
|
|
+ .zip(deviceTestService
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .in(DeviceTestEntity::getId, groupMap.keySet())
|
|
|
+ .fetch()
|
|
|
+ .collectMap(DeviceTestEntity::getId),
|
|
|
+ testItemService
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .in(TestItemEntity::getDeviceTestId, groupMap.keySet())
|
|
|
+ .fetch()
|
|
|
+ .collectList()
|
|
|
+ )
|
|
|
+ .flatMapMany(tp2 -> {
|
|
|
+ for (Map.Entry<String, ReportResponse> entry : groupMap.entrySet()) {
|
|
|
+ if (tp2.getT1().get(entry.getKey()) == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ //复制属性
|
|
|
+ entry.getValue().fromDeviceTestEntity(entry.getValue(), tp2.getT1().get(entry.getKey()));
|
|
|
+
|
|
|
+ List<TestItemDetail> itemDetails = new ArrayList<>();
|
|
|
+ tp2.getT2()
|
|
|
+ .forEach(item -> {
|
|
|
+ if (item.getDeviceTestId().equals(entry.getKey())) {
|
|
|
+ TestItemDetail detail = new TestItemDetail();
|
|
|
+ detail.setId(item.getId());
|
|
|
+ detail.setName(item.getName());
|
|
|
+ detail.setState(item.getState());
|
|
|
+ detail.setEnableTest(item.getEnableTest());
|
|
|
+ itemDetails.add(detail);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ entry.getValue().setProperties(itemDetails);
|
|
|
+ }
|
|
|
+ return Flux.fromIterable(groupMap.values());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<ReportResponse> queryReportByDeviceId(String deviceId) {
|
|
|
+ return createQuery()
|
|
|
+ .where(ReportEntity::getDeviceId, deviceId)
|
|
|
+ .fetchOne()
|
|
|
+ .map(ReportResponse::of)
|
|
|
+ .flatMap(res -> corpService
|
|
|
+ .findById(res.getCorpId())
|
|
|
+ .flatMap(corpEntity -> {
|
|
|
+ res.setCqAddress(corpEntity.getCqAddress());
|
|
|
+ res.setCorpName(corpEntity.getName());
|
|
|
+ return addItems(res);
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<ReportResponse> convertReportByCorpId(List<ReportResponse> report) {
|
|
|
+ List<String> corpIds = report.stream().map(ReportResponse::getCorpId).collect(Collectors.toList());
|
|
|
+
|
|
|
+ return corpService
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .in(CorpEntity::getId, corpIds)
|
|
|
+ .fetch()
|
|
|
+ .collectMap(CorpEntity::getId)
|
|
|
+ .flatMap(entity -> {
|
|
|
+ report.forEach(response -> {
|
|
|
+ response.setShortName(entity.get(response.getCorpId()).getShortName() == null ? "" : entity
|
|
|
+ .get(response.getCorpId())
|
|
|
+ .getShortName());
|
|
|
+ response.setCqAddress(entity.get(response.getCorpId()).getCqAddress() == null ? "" : entity
|
|
|
+ .get(response.getCorpId())
|
|
|
+ .getCqAddress());
|
|
|
+ });
|
|
|
+ return Mono.just(report);
|
|
|
+ })
|
|
|
+ .flatMapMany(Flux::fromIterable);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<ReportResponse> addItems(ReportResponse reportResponse) {
|
|
|
+ return testItemService
|
|
|
+ .queryByTestDeviceId(reportResponse.getDeviceId())
|
|
|
+ .sort(Comparator.comparing(TestItemDetail::getId))
|
|
|
+ .collectList()
|
|
|
+ .flatMap(res -> {
|
|
|
+ reportResponse.setProperties(res);
|
|
|
+ return Mono.just(reportResponse);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<DeviceTestEntity> findAllTestingDevice() {
|
|
|
+ return this.createQuery()
|
|
|
+ .where()
|
|
|
+ .and(ReportEntity::getState, ReportTestState.test)
|
|
|
+ .fetch()
|
|
|
+ .map(ReportEntity::getDeviceId)
|
|
|
+ .collectList()
|
|
|
+ .flatMapMany(testDeviceIds -> deviceTestService
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .in(DeviceTestEntity::getId, testDeviceIds)
|
|
|
+ .fetch()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doReportChange(ReportEntity reportEntity) {
|
|
|
+ eventBus.publish("/report-changed", reportEntity)
|
|
|
+ .retry(3)
|
|
|
+ .doOnNext(l -> handleSubscribe(reportEntity))
|
|
|
+ .subscribe();
|
|
|
+ }
|
|
|
+
|
|
|
+ @EventListener
|
|
|
+ public void handleEvent(EntityCreatedEvent<ReportEntity> event) {
|
|
|
+ event.getEntity().forEach(this::doReportChange);
|
|
|
+ }
|
|
|
+
|
|
|
+ // @EventListener
|
|
|
+// public void handleEvent(EntitySavedEvent<ReportEntity> event) {
|
|
|
+// event.getEntity().forEach(this::doReportChange);
|
|
|
+// }
|
|
|
+//
|
|
|
+// @EventListener
|
|
|
+// public void handleEvent(EntityModifyEvent<ReportEntity> event) {
|
|
|
+// //event.getAfter().forEach(this::doReportChange);
|
|
|
+// event.async(
|
|
|
+// autoControlReportChange(event.getBefore(), event.getAfter())
|
|
|
+// );
|
|
|
+// }
|
|
|
+
|
|
|
+ @EventListener
|
|
|
+ public void handleEvent(EntityDeletedEvent<ReportEntity> event) {
|
|
|
+ event.getEntity().forEach(entity -> {
|
|
|
+ entity.setState(ReportTestState.toBeTest);
|
|
|
+ doReportChange(entity);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private Flux<Void> autoControlReportChange(List<ReportEntity> beforeList, List<ReportEntity> afterList) {
|
|
|
+ return Flux.fromIterable(beforeList)
|
|
|
+ .collect(Collectors.toMap(ReportEntity::getId, Function.identity()))
|
|
|
+ .flatMapMany(beforeMap -> Flux
|
|
|
+ .fromIterable(afterList)
|
|
|
+ .doOnNext(after -> {
|
|
|
+ ReportEntity before = beforeMap.get(after.getId());
|
|
|
+ if (ReportTestState.test.equals(after.getState())
|
|
|
+ && !ReportTestState.test.equals(before.getState())) {
|
|
|
+ doReportChange(after);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .then()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 监听测试设备删除事件,判断改设备是否已开始测试
|
|
|
+ *
|
|
|
+ * @param event
|
|
|
+ */
|
|
|
+ @EventListener
|
|
|
+ public void handleDeviceTestDelete(EntityDeletedEvent<DeviceTestEntity> event) {
|
|
|
+ event.async(getReportState(event.getEntity()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Void> getReportState(List<DeviceTestEntity> list) {
|
|
|
+ return createQuery()
|
|
|
+ .where()
|
|
|
+ .in(ReportEntity::getDeviceId, list
|
|
|
+ .stream()
|
|
|
+ .map(DeviceTestEntity::getId)
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ )
|
|
|
+ .fetch()
|
|
|
+ .flatMap(reportEntity -> {
|
|
|
+ if (ReportTestState.test.equals(reportEntity.getState())) {
|
|
|
+ return Mono.error(new BusinessException("该设备已开始测试"));
|
|
|
+ } else if (ReportTestState.qualified.equals(reportEntity.getState())) {
|
|
|
+ return Mono.error(new BusinessException("该设备已通过测试"));
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ })
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Subscribe(value = "/report-changed", features = Subscription.Feature.broker)
|
|
|
+ public void handleSubscribe(ReportEntity entity) {
|
|
|
+
|
|
|
+
|
|
|
+ //停止测试
|
|
|
+ if (!ReportTestState.test.equals(entity.getState())) {
|
|
|
+ Optional.ofNullable(subscribers.remove(entity.getId()))
|
|
|
+ .ifPresent(Disposable::dispose);
|
|
|
+ log.debug("unsubscribe:{}({}),{}", entity.getDeviceName(), entity.getDeviceId(), entity.getId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Disposable old = subscribers
|
|
|
+ .put(entity.getId(),
|
|
|
+ deviceTestService
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .and(DeviceTestEntity::getId, entity.getDeviceId())
|
|
|
+ .fetchOne()
|
|
|
+ .flatMap(deviceTestEntity -> createRule(entity, deviceTestEntity)
|
|
|
+ .flatMap(this::createTask))
|
|
|
+ .flatMapMany(DeviceTestTaskExecutor::doSubscribe)
|
|
|
+ .then()
|
|
|
+ //.then(eventBus.publish("/device-test/", "")) //推送
|
|
|
+ .subscribe()
|
|
|
+ );
|
|
|
+ log.debug("subscribe device test:{}{}", entity.getDeviceName(), entity.getId());
|
|
|
+
|
|
|
+ if (null != old) {
|
|
|
+ log.debug("close old subscribe device test:{}{}", entity.getDeviceName(), entity.getId());
|
|
|
+ old.dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<DeviceTestRule> createRule(ReportEntity reportEntity, DeviceTestEntity testEntity) {
|
|
|
+
|
|
|
+ DeviceTestRule rule = new DeviceTestRule();
|
|
|
+ rule.setId(reportEntity.getId());
|
|
|
+ rule.setProductId(testEntity.getProductId());
|
|
|
+ rule.setDeviceId(testEntity.getSourceAddress());
|
|
|
+ rule.setProductName(testEntity.getProductName());
|
|
|
+ rule.setDeviceName(testEntity.getName());
|
|
|
+ return testItemService.createQuery()
|
|
|
+ .where()
|
|
|
+ .and(TestItemEntity::getDeviceTestId, testEntity.getId())
|
|
|
+ //只测试必测项
|
|
|
+ .and(TestItemEntity::getEnableTest, true)
|
|
|
+ .not(TestItemEntity::getState, TestState.passed)
|
|
|
+ .fetch()
|
|
|
+ .collectList()
|
|
|
+ .flatMap(items -> {
|
|
|
+ List<DeviceTestRule.Trigger> triggers = items
|
|
|
+ .stream()
|
|
|
+ //.filter(TestItemEntity::getEnableTest)
|
|
|
+ .map(testItem -> {
|
|
|
+
|
|
|
+ DeviceTestRule.Trigger trigger = new DeviceTestRule.Trigger();
|
|
|
+ trigger.setTestItemId(testItem.getId());
|
|
|
+ trigger.setModelId(testItem.getKey());
|
|
|
+ trigger.setType(DeviceTestRule.MessageType.getType(testItem
|
|
|
+ .getType()
|
|
|
+ .getValue()));
|
|
|
+ if (null != createFilter(testItem)) {
|
|
|
+ trigger.setFilters(Collections.singletonList(createFilter(testItem)));
|
|
|
+ }
|
|
|
+
|
|
|
+ return trigger;
|
|
|
+ })
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ rule.setTriggers(triggers);
|
|
|
+ return Mono.just(rule);
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private DeviceTestRule.ConditionFilter createFilter(TestItemEntity itemEntity) {
|
|
|
+
|
|
|
+ return DeviceTestRule
|
|
|
+ .MessageType
|
|
|
+ .getType(itemEntity.getType().getValue())
|
|
|
+ .createOperator(
|
|
|
+ itemEntity.getKey(),
|
|
|
+ itemEntity.getOperator(),
|
|
|
+ itemEntity.getCondition(),
|
|
|
+ itemEntity.getParameter()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<DeviceTestTaskExecutor> createTask(DeviceTestRule rule) {
|
|
|
+ return Mono.just(new DeviceTestTaskExecutor(eventBus,
|
|
|
+ this,
|
|
|
+ testItemService,
|
|
|
+ rule));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(String... args) {
|
|
|
+ createQuery()
|
|
|
+ .where()
|
|
|
+ .and(ReportEntity::getState, ReportTestState.test)
|
|
|
+ .fetch()
|
|
|
+ .doOnNext(this::doReportChange)
|
|
|
+ .subscribe();
|
|
|
+ }
|
|
|
+}
|