Code

package com.galaxy.controller.v1;

import com.galaxy.domain.ReportDomainService;
import com.galaxy.enumeration.ReportType;
import com.galaxy.model.dto.ReportUpdateDto;
import com.galaxy.model.vo.ReportDocumentVo;
import com.galaxy.module.controller.PltBaseController;
import com.galaxy.module.resp.SuccessResp;
import lombok.SneakyThrows;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import java.util.Map;

@Validated
@RestController
@RequestMapping("/v1/report")
public class ReportManageController extends PltBaseController {

    private final ReportDomainService reportDomainService;

    public ReportManageController(HttpServletRequest request, ReportDomainService reportDomainService) {
        super(request);
        this.reportDomainService = reportDomainService;
    }

    @PostMapping
    public SuccessResp<Void> createReportRecord(@RequestBody Map<String, Object> searchParam) {
        reportDomainService.create(searchParam, getAccountId(), getAccount());
        return new SuccessResp<>();
    }


    @PutMapping
    public SuccessResp<Void> makeReportDocument(@RequestBody @Valid ReportUpdateDto dto) {
        reportDomainService.makeReportDocument(dto, getAccountId(), getAccount());
        return new SuccessResp<>();
    }

    @GetMapping("/download")
    @SneakyThrows
    public SuccessResp<ReportDocumentVo> getDocument(@RequestParam Long id, @RequestParam ReportType reportType, HttpServletResponse response) {
        return new SuccessResp<>(reportDomainService.downloadDocument(id, reportType));

    }

}

package com.galaxy.converter;

import com.galaxy.model.vo.ReportDownloadRecordVo;
import com.galaxy.model.vo.ReportQryVo;
import com.galaxy.storage.rdbms.entity.ReportDownloadRecordEntity;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;

@Mapper
public interface ReportConverter {

    ReportConverter INSTANCES = Mappers.getMapper(ReportConverter.class);


    ReportQryVo toQryVo(ReportDownloadRecordEntity entity);

    ReportDownloadRecordVo toVo(ReportDownloadRecordEntity entity);

}

package com.galaxy.domain.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.galaxy.domain.ReportDomainService;
import com.galaxy.enumeration.ReportDocumentType;
import com.galaxy.enumeration.ReportType;
import com.galaxy.model.dto.ReportUpdateDto;
import com.galaxy.model.vo.ReportDocumentCombineVo;
import com.galaxy.model.vo.ReportDocumentVo;
import com.galaxy.model.vo.ReportDownloadRecordVo;
import com.galaxy.module.Sender;
import com.galaxy.module.exception.BusinessException;
import com.galaxy.module.rabbit.model.RabbitSendWith;
import com.galaxy.service.FileManageService;
import com.galaxy.service.ReportDocumentService;
import com.galaxy.service.ReportDownloadRecordService;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.galaxy.module.error.Error.BAD_REQUEST_ERROR;
import static com.galaxy.module.error.Error.DATA_IS_CREATED;
import static com.galaxy.module.error.Error.DATA_NOT_FOUND;
import static com.galaxy.mq.rabbit.config.ReportConfig.COMBINE_FILE_EX;
import static com.galaxy.mq.rabbit.config.ReportConfig.COMBINE_FILE_Q;
import static com.galaxy.mq.rabbit.config.ReportConfig.REPORT_CACHE_MINUTE;

@Slf4j
@Service
public class ReportDomainServiceImpl implements ReportDomainService {

    private final ObjectMapper objectMapper;
    private final ReportDownloadRecordService reportDownloadRecordService;
    private final FileManageService fileManageService;
    private final Sender<RabbitSendWith> rabbitSender;
    private final Set<ReportDocumentService> reportDocumentServices;

    public ReportDomainServiceImpl(ObjectMapper objectMapper,
                                   ReportDownloadRecordService reportDownloadRecordService,
                                   FileManageService fileManageService,
                                   @Qualifier("normalRabbitSender") Sender<RabbitSendWith> rabbitSender,
                                   Set<ReportDocumentService> reportDocumentServices) {
        this.objectMapper = objectMapper;
        this.reportDownloadRecordService = reportDownloadRecordService;
        this.fileManageService = fileManageService;
        this.rabbitSender = rabbitSender;
        this.reportDocumentServices = Collections.unmodifiableSet(reportDocumentServices);
    }

    @Override
    @SneakyThrows
    public void create(Map<String, Object> searchParam, @NonNull Long accountId, @NonNull String account) {
        ReportType reportType = Optional.ofNullable(searchParam.get("reportType"))
                .map(String::valueOf)
                .map(ReportType::valueOf)
                .orElseThrow(() -> new BusinessException(BAD_REQUEST_ERROR, "請傳入報表類型:reportType"));

        String searchParamHash = DigestUtils.md5Hex(objectMapper.writeValueAsString(searchParam));

        if (reportDownloadRecordService.duplicateQry(searchParamHash, accountId, Duration.ofMinutes(REPORT_CACHE_MINUTE))) {
            throw new BusinessException(DATA_IS_CREATED);
        }

        reportDownloadRecordService.createAndSendQry(reportType, searchParamHash, searchParam, account, accountId);
    }

    @Override
    public void makeReportDocument(ReportUpdateDto dto, @NonNull Long accountId, @NonNull String account) {
        if (!reportDownloadRecordService.isRunningStatus(dto.getId())) {
            throw new BusinessException(DATA_NOT_FOUND);
        }

        ReportDocumentType reportDocumentType = ReportDocumentType.valueOf(dto.getReportExportType().name());
        String errorMessage = null;
        try {
            fileManageService.createBlob(
                    getTempDocument(dto),
                    makeDocument(dto, reportDocumentType)
            );
        } catch (BusinessException | UnsupportedOperationException e) {
            errorMessage = e.getMessage();
            log.warn(errorMessage, e);
        } catch (Throwable e) {
            errorMessage = "檔案處理失敗!!!";
            log.error(errorMessage, e);
        }

        if (Objects.nonNull(errorMessage)) {
            // 執行失敗,這筆資料已經無法處理
            reportDownloadRecordService.updateStatusToFail(dto.getId(), errorMessage);
            return;
        }

        String tempDirectory = getTempDirectory(reportDocumentType, dto.getId());
        List<String> directoryFileNames = fileManageService.getDirectoryOfFileNames(tempDirectory);
        if (dto.getTotalPage() != directoryFileNames.size()) return;

        // storage 資料筆數滿足
        ReportDocumentCombineVo qryVo = ReportDocumentCombineVo.builder()
                .id(dto.getId())
                .reportDocumentType(reportDocumentType)
                .build();
        rabbitSender.send(new RabbitSendWith(COMBINE_FILE_EX, COMBINE_FILE_Q), qryVo);

    }

    private String getTempDocument(ReportUpdateDto dto) {
        ReportDocumentType reportDocumentType = ReportDocumentType.valueOf(dto.getReportExportType().name());

        String directory = getTempDirectory(reportDocumentType, dto.getId());
        String fileName = dto.getCurrentPage() + reportDocumentType.getFileSuffix();

        return directory + fileName;
    }

    private String getTempDirectory(ReportDocumentType reportDocumentType, Long id) {
        String documentPrefixPath = fileManageService.getDocumentPrefixPath(reportDocumentType);
        return String.format("%s%d-temp/", documentPrefixPath, id);
    }

    private String getDirectory(ReportDocumentType reportDocumentType, Long id) {
        String documentPrefixPath = fileManageService.getDocumentPrefixPath(reportDocumentType);
        return String.format("%s%d/", documentPrefixPath, id);
    }


    @Override
    public void updateStatusFail(Long id) {
        reportDownloadRecordService.updateStatusFail(id);
    }

    @Override
    public void makeCombineDocument(ReportDocumentCombineVo vo) {
        ReportDocumentType reportDocumentType = vo.getReportDocumentType();
        String tempDirectory = getTempDirectory(reportDocumentType, vo.getId());
        String documentPath = String.format("%s%s%s",
                getDirectory(reportDocumentType, vo.getId()),
                vo.getId(),
                reportDocumentType.getFileSuffix()
        );

        String errorMessage = null;
        try {
            List<String> directoryFileNames = fileManageService.getDirectoryOfFileNames(tempDirectory);

            List<byte[]> documents = directoryFileNames.stream()
                    .map(fileManageService::getDocument)
                    .collect(Collectors.toList());

            byte[] bytes = getReportDocumentService(reportDocumentType).combineDocument(documents);

            fileManageService.createBlob(documentPath, bytes);
        } catch (Throwable e) {
            errorMessage = "報表合併失敗";
            log.error(errorMessage, e);
        }

        reportDownloadRecordService.updateStatus(vo.getId(), documentPath, errorMessage);
        cleanRecordTempDocuments(tempDirectory);
    }

    @Override
    public ReportDocumentVo downloadDocument(Long id, ReportType reportType) {
        ReportDownloadRecordVo qryVo = reportDownloadRecordService.qryRecord(id, reportType);
        if (Objects.isNull(qryVo)) throw new BusinessException(DATA_NOT_FOUND);

        String filePath = qryVo.getFilePath();
        byte[] document = fileManageService.getDocument(filePath);

        String fileSuffix = filePath.substring(filePath.lastIndexOf("."));

        return ReportDocumentVo.builder()
                .id(id)
                .bytes(document)
                .fileSuffix(fileSuffix)
                .build();
    }

    private void cleanRecordTempDocuments(String directory) {
        fileManageService.deleteDirectory(directory);
    }

    private byte[] makeDocument(ReportUpdateDto dto, ReportDocumentType reportDocumentType) {
        ReportDocumentService reportDocumentService = getReportDocumentService(reportDocumentType);
        return reportDocumentService.makeDocument(dto.getTitles(), dto.getRows());
    }

    private ReportDocumentService getReportDocumentService(ReportDocumentType reportDocumentType) {
        ReportDocumentService reportDocumentService = reportDocumentServices.stream().
                filter(service -> service.accept(reportDocumentType))
                .findFirst()
                .orElseThrow(() -> new UnsupportedOperationException(reportDocumentType.name() + "尚未實現:ReportDocumentService"));
        return reportDocumentService;
    }

}

package com.galaxy.domain;

import com.galaxy.enumeration.ReportType;
import com.galaxy.model.dto.ReportUpdateDto;
import com.galaxy.model.vo.ReportDocumentCombineVo;
import com.galaxy.model.vo.ReportDocumentVo;

import java.util.Map;

public interface ReportDomainService {

    void create(Map<String, Object> searchParam, Long accountId, String account);

    void makeReportDocument(ReportUpdateDto dto, Long accountId, String account);

    void updateStatusFail(Long id);

    void makeCombineDocument(ReportDocumentCombineVo payload);

    ReportDocumentVo downloadDocument(Long id, ReportType reportType);
}

package com.galaxy.enumeration;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Getter
public enum ReportDocumentType {
    CSV("/doc/report/csv", ".csv"),
    PDF("/doc/report/pdf", ".pdf");

    private final String path;
    private final String fileSuffix;

}

package com.galaxy.enumeration;

public enum ReportExportType {
    CSV
}

package com.galaxy.enumeration;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Getter
public enum ReportStatus {
    RUNNING(0, "處理中"),
    SUCCESS(1, "成功"),
    FAIL(2, "失敗");
    private final Integer code;
    private final String desc;

}

package com.galaxy.enumeration;

import lombok.Getter;

@Getter
public enum ReportType {
    PROXY("PROXY", "代理列表"),
    PROXY_DOMAIN("PROXY_DOMAIN", "代理域名")
    ;

    private final String type;
    private final String desc;

    ReportType(String type, String desc) {
        this.type = type;
        this.desc = desc;
    }
}

delete ReportTypeEnum.java
ReportCreateDto.java
package com.galaxy.model.dto;

import com.galaxy.enumeration.ReportExportType;
import lombok.Data;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.List;

@Data
public class ReportUpdateDto {
    @NotNull
    @Min(0)
    private Long id;
    @NotNull
    private ReportExportType reportExportType;
    @NotNull
    private List<String> titles;
    @NotNull
    private List<List<?>> rows;
    @NotNull
    private Integer currentPage;
    @NotNull
    private Integer totalPage;

}

package com.galaxy.model.vo;

import com.galaxy.enumeration.ReportType;
import lombok.Data;

import java.util.Map;

@Data
public class ReportCreateVo {
    private String routingKey;
    private String source;
    private ReportType reportType;
    private String searchParamHash;
    private Map<String, String> searchParam;
    private String errorMessage;
    private Long creatorId;
    private String creator;
}

package com.galaxy.model.vo;

import com.galaxy.enumeration.ReportDocumentType;
import lombok.Builder;
import lombok.Data;

import javax.validation.constraints.NotNull;

@Data
@Builder
public class ReportDocumentCombineVo {
    @NotNull
    private Long id;
    @NotNull
    private ReportDocumentType reportDocumentType;

}

package com.galaxy.model.vo;

import lombok.Builder;
import lombok.Data;

import javax.validation.constraints.NotNull;

@Data
@Builder
public class ReportDocumentVo {
    @NotNull
    private Long id;
    @NotNull
    private byte[] bytes;
    @NotNull
    private String fileSuffix;


}

```` package com.galaxy.model.vo;

import com.galaxy.enumeration.ReportStatus; import com.galaxy.enumeration.ReportType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data;

import java.util.Map;

@Data @Builder @AllArgsConstructor public class ReportDownloadRecordVo {

private Long id;
private String source; // 报表来源,中文說明
private ReportType reportType; // 報表名稱
private String filePath; // 檔案路徑
private ReportStatus status; // RUNNING:處理中 SUCCESS:成功 FAIL:失敗
private String searchParamHash; // md5(searchParam)

private Map<String, Object> searchParam; // 查詢條件
private String errorMessage; // 錯誤訊息

}




package com.galaxy.model.vo;

import com.galaxy.enumeration.ReportType; import lombok.Builder; import lombok.Data;

import javax.validation.constraints.NotNull; import java.util.Map;

@Data @Builder public class ReportQryVo { @NotNull private Long id; @NotNull private ReportType reportType; @NotNull private Map searchParam;

}


package com.galaxy.mq.rabbit.config;

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class ReportConfig { // TODO // FIXME mq sync to normal

public static final Integer REPORT_CACHE_MINUTE = 5; // cache 5分鐘
/**
 * 報表 ROUTING KEY 規則
 */
public final static String REPORT_TOPIC_ROUTING_KEY_RULE = "plt.basic.report.topic.%s.q";
/**
 * 報表使用TopicExchange
 */
public final static String REPORT_TOPIC_EX = "plt.basic.report.topic.ex";
public final static String MONITOR_RUN_FAILURE_Q = "plt.basic.report.topic.q";
public final static String MONITOR_RUN_FAILURE_DATA_KEY = "plt.basic.report.topic.#";
private final static Integer DELAY_QUEUE_DELAY_TIME = REPORT_CACHE_MINUTE * 60 * 1000;
private final static String DELAY_QUEUE_CHANNEL_Q = "plt.basic.report.delay.channel.q"; // 延遲隊列
public final static String DELAY_QUEUE_EX = "plt.basic.report.delay.ex";
public final static String DELAY_QUEUE_Q = "plt.basic.report.delay.q";
public final static String DEAD_QUEUE_EX = "plt.basic.report.dead.ex";
public final static String DEAD_QUEUE_Q = "plt.basic.report.dead.q";
public final static String COMBINE_FILE_EX = "plt.basic.report.combine.file.ex";
public final static String COMBINE_FILE_Q = "plt.basic.report.combine.file.q";

@Bean("reportTopicExchange")
public TopicExchange reportTopicExchange(@Qualifier("normalAdmin") RabbitAdmin admin) {
    TopicExchange topicExchange = new TopicExchange(REPORT_TOPIC_EX);
    topicExchange.setAdminsThatShouldDeclare(admin);
    return topicExchange;
}

@Bean("reportCommonQueue")
public Queue reportCommonQueue(@Qualifier("normalAdmin") RabbitAdmin admin) {
    Queue q = QueueBuilder.durable(MONITOR_RUN_FAILURE_Q).quorum().build();
    q.setAdminsThatShouldDeclare(admin);
    return q;
}

@Bean("reportCommonQueueBinding")
public Binding reportCommonQueueBinding(@Qualifier("normalAdmin") RabbitAdmin admin,
                                        @Qualifier("reportTopicExchange") TopicExchange topicExchange,
                                        @Qualifier("reportCommonQueue") Queue q) {

    Binding b = BindingBuilder.bind(q).to(topicExchange).with(MONITOR_RUN_FAILURE_DATA_KEY);
    b.setAdminsThatShouldDeclare(admin);
    return b;
}

@Bean("reportDelayExchange")
public DirectExchange reportDelayExchange(@Qualifier("normalAdmin") RabbitAdmin admin) {
    DirectExchange dex = new DirectExchange(DELAY_QUEUE_EX);
    dex.setAdminsThatShouldDeclare(admin);
    return dex;
}

@Bean("reportDelayQueue")
public Queue reportDelayQueue(@Qualifier("normalAdmin") RabbitAdmin admin) {
    Queue q = QueueBuilder.durable(DELAY_QUEUE_Q)
            .quorum()
            .deadLetterExchange(DEAD_QUEUE_EX)
            .deadLetterRoutingKey(DEAD_QUEUE_Q)
            .ttl(DELAY_QUEUE_DELAY_TIME)
            .build();
    q.setAdminsThatShouldDeclare(admin);
    return q;
}

@Bean("delayQueueBinding")
public Binding delayQueueBinding(@Qualifier("normalAdmin") RabbitAdmin admin,
                                 @Qualifier("reportDelayExchange") DirectExchange dex,
                                 @Qualifier("reportDelayQueue") Queue q) {
    Binding b = BindingBuilder.bind(q).to(dex).with(DELAY_QUEUE_CHANNEL_Q);
    b.setAdminsThatShouldDeclare(admin);
    return b;
}


@Bean("reportDeadExchange")
public DirectExchange reportDeadExchange(@Qualifier("normalAdmin") RabbitAdmin admin) {
    DirectExchange dex = new DirectExchange(DEAD_QUEUE_EX);
    dex.setAdminsThatShouldDeclare(admin);
    return dex;
}

@Bean("reportDeadQueue")
public Queue reportDeadQueue(@Qualifier("normalAdmin") RabbitAdmin admin) {
    Queue q = QueueBuilder.durable(DEAD_QUEUE_Q)
            .quorum()
            .build();
    q.setAdminsThatShouldDeclare(admin);
    return q;
}

@Bean("reportDeadQueueBinding")
public Binding reportDeadQueueBinding(@Qualifier("normalAdmin") RabbitAdmin admin,
                                      @Qualifier("reportDeadExchange") DirectExchange dex,
                                      @Qualifier("reportDeadQueue") Queue q) {
    Binding b = BindingBuilder.bind(q).to(dex).with(DEAD_QUEUE_Q);
    b.setAdminsThatShouldDeclare(admin);
    return b;
}


@Bean("combineFileEx")
public FanoutExchange combineFileEx(@Qualifier("normalAdmin") RabbitAdmin admin) {
    FanoutExchange ex = new FanoutExchange(COMBINE_FILE_EX);
    ex.setAdminsThatShouldDeclare(admin);
    return ex;
}

@Bean("combineFileQ")
public Queue combineFileQ(@Qualifier("normalAdmin") RabbitAdmin admin) {
    Queue q = QueueBuilder.durable(COMBINE_FILE_Q).quorum().build();
    q.setAdminsThatShouldDeclare(admin);
    return q;
}

@Bean
public Binding combineFileQBinding(@Qualifier("normalAdmin") RabbitAdmin admin,
                                   @Qualifier("combineFileEx") FanoutExchange ex,
                                   @Qualifier("combineFileQ") Queue q) {
    Binding b = BindingBuilder.bind(q).to(ex);
    b.setAdminsThatShouldDeclare(admin);
    return b;
}

}


package com.galaxy.mq.rabbit;

import com.galaxy.domain.ReportDomainService; import com.galaxy.model.vo.ReportDocumentCombineVo; import com.galaxy.model.vo.ReportQryVo; import com.galaxy.module.Sender; import com.galaxy.module.rabbit.listener.RabbitListenerRequeueable; import com.galaxy.module.rabbit.model.RabbitSendWith; import com.rabbitmq.client.Channel; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated;

import static com.galaxy.mq.rabbit.config.ReportConfig.COMBINE_FILE_EX; import static com.galaxy.mq.rabbit.config.ReportConfig.COMBINE_FILE_Q; import static com.galaxy.mq.rabbit.config.ReportConfig.DEAD_QUEUE_EX; import static com.galaxy.mq.rabbit.config.ReportConfig.DEAD_QUEUE_Q; import static com.galaxy.mq.rabbit.config.ReportConfig.DELAY_QUEUE_Q; import static com.galaxy.mq.rabbit.config.ReportConfig.MONITOR_RUN_FAILURE_Q; import static com.galaxy.mq.rabbit.config.ReportConfig.REPORT_TOPIC_EX;

@Slf4j @Validated @Component public class ReportListener extends com.galaxy.module.rabbit.listener.RabbitListener {

private final Sender<RabbitSendWith> rabbitSender;
private final ReportDomainService reportDomainService;


public ReportListener(@Qualifier("syncRabbitSender") Sender<RabbitSendWith> rabbitSender,
                      ReportDomainService reportDomainService) {
    this.rabbitSender = rabbitSender;
    this.reportDomainService = reportDomainService;
}

@SneakyThrows
@RabbitListener(queues = MONITOR_RUN_FAILURE_Q, containerFactory = "normalContainerFactory", concurrency = "1")
public void registerReportFailListener(@Payload Message<ReportQryVo> messages, Channel channel) {
    processMessageOrRequeue(messages, channel,
            new RabbitListenerRequeueable.RequeueInfo(REPORT_TOPIC_EX, MONITOR_RUN_FAILURE_Q,
                    MONITOR_RUN_FAILURE_Q, rabbitSender),
            () -> {
                ReportQryVo qryVo = messages.getPayload();
                rabbitSender.send(new RabbitSendWith("", DELAY_QUEUE_Q), qryVo);
                log.info("send message to delay queue");
            });
}

@SneakyThrows
@RabbitListener(queues = DEAD_QUEUE_Q, containerFactory = "normalContainerFactory", concurrency = "1")
public void handleFailedRecord(@Payload Message<ReportQryVo> messages, Channel channel) {
    processMessageOrRequeue(messages, channel,
            new RabbitListenerRequeueable.RequeueInfo(DEAD_QUEUE_EX, DEAD_QUEUE_Q, DEAD_QUEUE_Q, rabbitSender),
            () -> {
                ReportQryVo payload = messages.getPayload();
                reportDomainService.updateStatusFail(payload.getId());
                log.info("update message status to fail : id = " + payload.getId());
            });
}


@SneakyThrows
@RabbitListener(queues = COMBINE_FILE_Q, containerFactory = "normalContainerFactory", concurrency = "1")
public void handleCombineDocument(@Payload Message<ReportDocumentCombineVo> messages, Channel channel) {
    processMessageOrRequeue(messages, channel,
            new RabbitListenerRequeueable.RequeueInfo(COMBINE_FILE_EX, COMBINE_FILE_Q, COMBINE_FILE_Q, rabbitSender),
            () -> reportDomainService.makeCombineDocument(messages.getPayload()));
}

}



package com.galaxy.service.impl;

import com.galaxy.enumeration.ReportDocumentType; import com.galaxy.module.exception.BusinessException; import com.galaxy.service.ReportDocumentService; import lombok.NonNull; import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors;

import static com.galaxy.module.error.Error.BAD_REQUEST_ERROR;

@Service public class CSVDocumentServiceImpl implements ReportDocumentService { private DateTimeFormatter formatter = DateTimeFormatter.ISO_TIME;

public boolean accept(ReportDocumentType reportDocumentType) {
    return ReportDocumentType.CSV == reportDocumentType;
}

@Override
public byte[] makeDocument(List<String> titles, List<List<?>> rows) throws BusinessException {
    if (rows.isEmpty()) throw new BusinessException(BAD_REQUEST_ERROR, "查無資料");

    if (titles.size() != rows.get(0).size()) throw new BusinessException(BAD_REQUEST_ERROR, "表頭與資料無法匹配");

    String reportTitle = titles.stream().map(String::valueOf).collect(Collectors.joining(","));
    String reportContent = rows.stream()
            .map(this::toRowString)
            .collect(Collectors.joining(System.lineSeparator()));

    return String.format("%s%s%s", reportTitle, System.lineSeparator(), reportContent).getBytes();
}

@Override
public byte[] combineDocument(@NonNull List<byte[]> documents) {
    if (documents.isEmpty()) throw new BusinessException(BAD_REQUEST_ERROR, "google storage 查無資料");
    if (documents.size() == 1) return documents.get(0);


    String firstDocument = new String(documents.get(0));
    String nextAllRemoveTitleDocuments = documents.stream()
            .skip(1)
            .map(this::removeCsvTitles)
            .collect(Collectors.joining(System.lineSeparator()));

    String csv = firstDocument + System.lineSeparator() + nextAllRemoveTitleDocuments;
    return csv.getBytes(StandardCharsets.UTF_8);
}

private String removeCsvTitles(byte[] rowBytes) {
    String[] rows = new String(rowBytes).split(System.lineSeparator());
    return Arrays.stream(rows)
            .skip(1)
            .collect(Collectors.joining(System.lineSeparator()));
}

private String toRowString(List<?> row) {
    return row.stream()
            .map(this::toCellString)
            .collect(Collectors.joining(","));
}

private String toCellString(Object cell) {
    if (cell == null) return "";

    switch (cell.getClass().getName()) {
        case "java.time.OffsetDateTime":
            return formatter.format((OffsetDateTime) cell);
        default:
            return cell.toString();
    }
}

}



package com.galaxy.service.impl;

import com.galaxy.enumeration.ImagePath; import com.galaxy.enumeration.ReportDocumentType; import com.galaxy.enumeration.VideoPath; import com.galaxy.model.vo.UploadFileVo; import com.galaxy.service.FileManageService; import com.galaxy.utils.GcpUtil; import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.springframework.lang.NonNull; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile;

import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List;

@Slf4j @Service @RequiredArgsConstructor public class GcsFileManageServiceImpl implements FileManageService {

private final Storage storage;

@SneakyThrows
@Override
public void createBlob(@NonNull String filePath, @NonNull MultipartFile multipartFile) {
    createBlob(filePath, multipartFile.getBytes());
}

@Override
public void createBlob(@NonNull String path, @NonNull byte[] content) {
    String bucketName = GcpUtil.getBucketName();
    BlobId blobId = BlobId.of(bucketName, path);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
    storage.create(blobInfo, content);
}

@Override
public String getDocumentPrefixPath(@NonNull ReportDocumentType reportDocumentType) {
    return String.format("/%s%s/", GcpUtil.getPrefixFolder(), reportDocumentType.getPath());
}

@SneakyThrows
@Override
public String getImagePath(@NonNull UploadFileVo updateFileVo, @NonNull Integer imagePathType) {
    String md5Hex = DigestUtils.md5Hex(updateFileVo.getFile().getInputStream());
    ImagePath imagePath = ImagePath.valueOf(imagePathType);
    return GcpUtil.getPrefixFolder() + imagePath.getPath() + getFolderMd5Path(updateFileVo.getPltCode()) + generateFileName(md5Hex, updateFileVo.getFileSuffix());
}

@SneakyThrows
@Override
public String getVideoPath(@NonNull UploadFileVo updateFileVo, @NonNull Integer videoPathType) {
    String md5Hex = DigestUtils.md5Hex(updateFileVo.getFile().getInputStream());
    VideoPath videoPath = VideoPath.valueOf(videoPathType);
    return GcpUtil.getPrefixFolder() + videoPath.getPath() + getFolderMd5Path(updateFileVo.getPltCode()) + generateFileName(md5Hex, updateFileVo.getFileSuffix());
}

@Override
public byte[] getDocument(String path) {
    String bucketName = GcpUtil.getBucketName();
    BlobId blobId = BlobId.of(bucketName, path);
    return storage.readAllBytes(blobId);

}

@Override
public List<String> getDirectoryOfFileNames(String directory) {
    String bucketName = GcpUtil.getBucketName();
    Page<Blob> buckets = storage.list(bucketName, Storage.BlobListOption.currentDirectory(),
            Storage.BlobListOption.prefix(directory));
    Iterator<Blob> blobIterator = buckets.iterateAll().iterator();
    List<String> fileNames = new ArrayList<>();

    while (blobIterator.hasNext()) {
        Blob blob = blobIterator.next();
        fileNames.add(blob.getName());
    }
    Collections.sort(fileNames);
    return fileNames;
}

@Override
public int deleteDirectory(String directory) {
    String bucketName = GcpUtil.getBucketName();
    Page<Blob> buckets = storage.list(bucketName, Storage.BlobListOption.currentDirectory(),
            Storage.BlobListOption.prefix(directory));
    Iterator<Blob> blobIterator = buckets.iterateAll().iterator();
    List<BlobId> blobIds = new ArrayList<>();
    while (blobIterator.hasNext()) {
        Blob blob = blobIterator.next();
        BlobId blobId = blob.getBlobId();
        blobIds.add(blobId);
    }

    return storage.delete(blobIds).size();
}

private String getFolderMd5Path(String pltCode) {
    return DigestUtils.md5Hex(GcpUtil.getBucketName() + pltCode).substring(0, 10) + "/";
}

private String generateFileName(String md5Hex, String fileSuffix) {
    return md5Hex.substring(0, 16) + "." + fileSuffix;
}

}


package com.galaxy.service.impl;

import com.galaxy.enumeration.ReportDocumentType; import com.galaxy.module.exception.BusinessException; import com.galaxy.service.ReportDocumentService; import org.springframework.stereotype.Service;

import java.util.List;

@Service public class PDFDocumentServiceImpl implements ReportDocumentService { @Override public boolean accept(ReportDocumentType reportDocumentType) { return ReportDocumentType.PDF == reportDocumentType; }

@Override
public byte[] makeDocument(List<String> titles, List<List<?>> rows) throws BusinessException {
    throw new UnsupportedOperationException("目前不之支援產生pdf");
}

@Override
public byte[] combineDocument(List<byte[]> documents) {
    throw new UnsupportedOperationException("目前不之支援產生pdf");
}

}



package com.galaxy.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.galaxy.converter.ReportConverter; import com.galaxy.enumeration.ReportStatus; import com.galaxy.enumeration.ReportType; import com.galaxy.model.vo.ReportDownloadRecordVo; import com.galaxy.model.vo.ReportQryVo; import com.galaxy.module.Sender; import com.galaxy.module.rabbit.model.RabbitSendWith; import com.galaxy.service.ReportDownloadRecordService; import com.galaxy.storage.rdbms.entity.ReportDownloadRecordEntity; import com.galaxy.storage.rdbms.mapper.ReportDownloadRecordMapper; import lombok.NonNull; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;

import java.time.Duration; import java.time.OffsetDateTime; import java.util.List; import java.util.Map; import java.util.Objects;

import static com.galaxy.mq.rabbit.config.ReportConfig.REPORT_TOPIC_EX; import static com.galaxy.mq.rabbit.config.ReportConfig.REPORT_TOPIC_ROUTING_KEY_RULE;

@Service public class ReportDownloadRecordServiceImpl implements ReportDownloadRecordService { private final ReportDownloadRecordMapper reportDownloadRecordMapper;

private final Sender<RabbitSendWith> rabbitSender;

public ReportDownloadRecordServiceImpl(ReportDownloadRecordMapper reportDownloadRecordMapper,
                                       @Qualifier("normalRabbitSender") Sender<RabbitSendWith> rabbitSender) {
    this.reportDownloadRecordMapper = reportDownloadRecordMapper;
    this.rabbitSender = rabbitSender;
}

@Transactional(rollbackFor = Exception.class)
@Override
public void createAndSendQry(ReportType reportType, String searchParamHash, Map<String, Object> searchParam, String account, Long accountId) {
    ReportDownloadRecordEntity entity = ReportDownloadRecordEntity.builder()
            .source(reportType.getDesc())
            .reportType(reportType)
            .status(ReportStatus.RUNNING)
            .searchParamHash(searchParamHash)
            .searchParam(searchParam)
            .creator(account)
            .creatorId(accountId)
            .build();
    reportDownloadRecordMapper.insertRecord(entity);

    String routingKey = String.format(REPORT_TOPIC_ROUTING_KEY_RULE, reportType.getType().toLowerCase());
    ReportQryVo qryVo = ReportConverter.INSTANCES.toQryVo(entity);
    rabbitSender.send(new RabbitSendWith(REPORT_TOPIC_EX, routingKey), qryVo);

}


@Transactional(rollbackFor = Exception.class)
@Override
public void updateStatusToFail(@NonNull Long id, @NonNull String errorMessage) {
    ReportDownloadRecordEntity vo = ReportDownloadRecordEntity.builder()
            .id(id)
            .status(ReportStatus.FAIL)
            .errorMessage(errorMessage)
            .build();
    reportDownloadRecordMapper.updateById(vo);
}

@Override
public boolean isRunningStatus(@NonNull Long id) {
    LambdaQueryWrapper<ReportDownloadRecordEntity> queryWrapper = Wrappers.lambdaQuery();
    queryWrapper
            .eq(ReportDownloadRecordEntity::getId, id)
            .eq(ReportDownloadRecordEntity::getStatus, ReportStatus.RUNNING);
    return reportDownloadRecordMapper.exists(queryWrapper);
}

@Override
public boolean duplicateQry(String searchParamHash, Long accountId, Duration time) {
    OffsetDateTime queryTime = OffsetDateTime.now().minus(time);
    LambdaQueryWrapper<ReportDownloadRecordEntity> queryWrapper = Wrappers.lambdaQuery();
    queryWrapper
            .eq(ReportDownloadRecordEntity::getCreatorId, accountId)
            .eq(ReportDownloadRecordEntity::getSearchParamHash, searchParamHash)
            .in(ReportDownloadRecordEntity::getStatus, List.of(ReportStatus.SUCCESS, ReportStatus.RUNNING))
            .gt(ReportDownloadRecordEntity::getCreateTime, queryTime);
    return reportDownloadRecordMapper.exists(queryWrapper);
}

@Override
@Transactional(rollbackFor = Exception.class)
public void updateStatusFail(Long id) {
    reportDownloadRecordMapper.updateStatusFail(id, "make report timeout");
}

@Override
@Transactional(rollbackFor = Exception.class)
public void updateStatus(Long id, String filePath, String errorMessage) {
    ReportStatus status = Objects.isNull(errorMessage) ? ReportStatus.SUCCESS : ReportStatus.FAIL;
    reportDownloadRecordMapper.updateRecord(id, filePath, status, errorMessage);
}

@Override
public ReportDownloadRecordVo qryRecord(Long id, ReportType reportType) {
    LambdaQueryWrapper<ReportDownloadRecordEntity> queryWrapper = Wrappers.lambdaQuery();
    queryWrapper
            .eq(ReportDownloadRecordEntity::getId, id)
            .eq(ReportDownloadRecordEntity::getReportType, reportType)
            .eq(ReportDownloadRecordEntity::getStatus, ReportStatus.SUCCESS);

    return ReportConverter.INSTANCES.toVo(reportDownloadRecordMapper.selectOne(queryWrapper));
}

}



package com.galaxy.service;

import com.galaxy.enumeration.ReportDocumentType; import com.galaxy.model.vo.UploadFileVo; import org.springframework.web.multipart.MultipartFile;

import java.util.List;

public interface FileManageService {

void createBlob(String filePath, MultipartFile multipartFile);

void createBlob(String path, byte[] content);

String getDocumentPrefixPath(ReportDocumentType reportDocumentType);

String getImagePath(UploadFileVo updateFileVo, Integer imagePathType);

String getVideoPath(UploadFileVo updateFileVo, Integer videoPathType);

byte[] getDocument(String filePath);

List<String> getDirectoryOfFileNames(String directory);

int deleteDirectory(String directory);

}


package com.galaxy.service;

import com.galaxy.enumeration.ReportDocumentType; import com.galaxy.module.exception.BusinessException;

import java.util.List;

public interface ReportDocumentService { boolean accept(ReportDocumentType reportDocumentType);

byte[] makeDocument(List<String> titles, List<List<?>> rows) throws BusinessException;

byte[] combineDocument(List<byte[]> documents);

}



package com.galaxy.service;

import com.galaxy.enumeration.ReportType; import com.galaxy.model.vo.ReportDownloadRecordVo;

import java.time.Duration; import java.util.Map;

public interface ReportDownloadRecordService {

void createAndSendQry(ReportType reportType, String searchParamHash, Map<String, Object> searchParam, String account, Long accountId);

boolean isRunningStatus(Long id);

void updateStatusToFail(Long id, String errorMessage);

boolean duplicateQry(String searchParamHash, Long accountId, Duration ofMinutes);

void updateStatusFail(Long id);

void updateStatus(Long id, String documentPath, String errorMessage);

ReportDownloadRecordVo qryRecord(Long id, ReportType reportType);

}



package com.galaxy.storage.rdbms.entity;

import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; import com.galaxy.enumeration.ReportStatus; import com.galaxy.enumeration.ReportType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;

import java.time.OffsetDateTime; import java.util.Map;

@Data @Builder @NoArgsConstructor @AllArgsConstructor @TableName(value = "report_download_record", autoResultMap = true) public class ReportDownloadRecordEntity {

@TableId(type = IdType.AUTO)
private Long id;
private String source; // 报表来源,中文說明
private ReportType reportType; // 報表名稱
private String filePath; // 檔案路徑
private ReportStatus status; // RUNNING:處理中 SUCCESS:成功 FAIL:失敗
private String searchParamHash; // md5(searchParam)
@TableField(typeHandler = JacksonTypeHandler.class)
private Map<String, Object> searchParam; // 查詢條件
private String errorMessage; // 錯誤訊息
private Long creatorId;
private String creator;
private OffsetDateTime createTime;

}


package com.galaxy.storage.rdbms.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.galaxy.enumeration.ReportStatus; import com.galaxy.storage.rdbms.entity.ReportDownloadRecordEntity; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository;

@Mapper @Repository public interface ReportDownloadRecordMapper extends BaseMapper {

int insertRecord(ReportDownloadRecordEntity gameMerchantEntity);

void updateStatusFail(@Param("id") Long id, @Param("errorMessage") String errorMessage);

void updateRecord(@Param("id") Long id, @Param("filePath") String filePath, @Param("status") ReportStatus status, @Param("errorMessage") String errorMessage);

}


<insert id="insertRecord" useGeneratedKeys="true" keyProperty="id">
    INSERT INTO report_download_record (source, report_type, status, search_param_hash, search_param,
                                        error_message, creator_id, creator)
    VALUES (#{source}, #{reportType}, #{status}, #{searchParamHash},
            #{searchParam, typeHandler=com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler}::jsonb,
            #{errorMessage}, #{creatorId}, #{creator})
</insert>

<update id="updateStatusFail">
    update report_download_record
    set status        = 'FAIL',
        error_message = #{errorMessage}
    where id = #{id}
      and status = 'RUNNING'
</update>

<update id="updateRecord">
    update report_download_record
    set status        = #{status},
        file_path     = #{filePath},
        error_message = #{errorMessage}
    where id = #{id}
      and status = 'RUNNING'
</update>




BEGIN; DROP TABLE IF EXISTS plt_basics.vs_report_download_record; CREATE TABLE IF NOT EXISTS plt_basics.vs_report_download_record ( id INT4 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, source VARCHAR(64) NOT NULL, report_type VARCHAR(64) NOT NULL, file_path VARCHAR(256), status VARCHAR(8) NOT NULL DEFAULT 'RUNNING', search_param_hash VARCHAR(32) NOT NULL, search_param jsonb NOT NULL DEFAULT '{}', error_message VARCHAR(256), creator_id INT NOT NULL, creator VARCHAR(32) NOT NULL, create_time TIMESTAMP WITH TIME ZONE DEFAULT now() );

-- index by creator_id search_param_hash status CREATE INDEX index_c_s ON plt_basics.vs_report_download_record USING btree (creator_id, search_param_hash);

COMMENT ON TABLE plt_basics.vs_report_download_record IS '報表下載紀錄'; COMMENT ON COLUMN plt_basics.vs_report_download_record.id IS 'ID'; COMMENT ON COLUMN plt_basics.vs_report_download_record.source IS '报表来源說明'; COMMENT ON COLUMN plt_basics.vs_report_download_record.report_type IS '報表名稱'; COMMENT ON COLUMN plt_basics.vs_report_download_record.file_path IS '檔案路徑'; COMMENT ON COLUMN plt_basics.vs_report_download_record.status IS '報表執行狀態:RUNNING:處理中,SUCCESS:成功,FAIL:失敗'; COMMENT ON COLUMN plt_basics.vs_report_download_record.search_param_hash IS 'md5(searchParam)'; COMMENT ON COLUMN plt_basics.vs_report_download_record.search_param IS '查詢條件:json'; COMMENT ON COLUMN plt_basics.vs_report_download_record.error_message IS '錯誤訊息'; COMMENT ON COLUMN plt_basics.vs_report_download_record.creator_id IS '創建者id'; COMMENT ON COLUMN plt_basics.vs_report_download_record.creator IS '創建者帳號'; COMMENT ON COLUMN plt_basics.vs_report_download_record.create_time IS '創建時間';

COMMIT; ```