Skip to content

Multi-Stage Message Queue Workflow with Redis Coordination

The Multi-Stage Message Queue Workflow with Redis Coordination is a distributed system architecture designed to handle complex data retrieval and file generation tasks, such as cross-database report generation.^[001-TODO__報表下載.md] This pattern utilizes multiple Message Queue (MQ) stages to orchestrate asynchronous tasks while using Redis as a temporary state store and coordination mechanism to manage intermediate data and completion status.^[001-TODO__報表下載.md]

System Architecture

The workflow is typically implemented using a message broker (such as RabbitMQ) and Redis, often within a microservices environment like Spring Boot.^[001-TODO__報表下載.md] The system manages asynchronous requests where a user initiates a process, and the result is eventually stored in a cloud storage bucket (e.g., GCP Cloud Storage) for download.^[001-TODO__報表下載.md]

Persistence Layer

The workflow relies on a database entity, such as FileDownloadRecordEntity, to track the state of the job.^[001-TODO__報表下載.md] This record stores metadata including: * Source and enumeration type of the report^[001-TODO__報表下載.md] * Data retrieval criteria keys^[001-TODO__報表下載.md] * File path/URL^[001-TODO__報表下載.md] * Associated user/department IDs^[001-TODO__報表下載.md] * Timestamps for creation and completion^[001-TODO__報表下載.md]

Coordination with Redis

Redis is used to store intermediate query results temporarily and coordinate the transition between workflow stages.^[001-TODO__報表下載.md] A Redis Hash structure is employed for each job, using a key pattern based on the record ID and table details (e.g., FileDownloadRecordEntity-{id}:{table}-{id}).^[001-TODO__報表下載.md]

The data structure typically includes: * queryDoneCount: A counter incremented by listeners as they finish their specific sub-tasks^[001-TODO__報表下載.md]. * Result Lists: Data retrieved by specific listeners (e.g., plt_user.vs_user_tag_relation.user_id) stored as lists within the hash^[001-TODO__報表下載.md]. * TTL: A time-to-live (e.g., 10 minutes) is set to ensure the temporary data does not persist indefinitely^[001-TODO__報表下載.md].

Workflow Stages

The execution is divided into multiple sequential MQ stages, often involving a combination of direct processing and fan-out patterns to parallelize data retrieval.^[001-TODO__報表下載.md]

Stage 1: Parallel Data Retrieval

The initial stage often involves a Fanout Exchange to parallelize queries across different data sources or tables.^[001-TODO__報表下載.md]

  1. Trigger: A message containing the FileDownloadRecordEntity ID is sent to a Fanout Exchange (MQ2).^[001-TODO__報表下載.md]
  2. Execution:
    • Multiple listeners subscribe to the queue.
    • Each listener is responsible for querying a specific table (e.g., plt_user, plt_fund).
    • Retrieved data is pushed into the corresponding Redis Hash.
    • The queryDoneCount in Redis is incremented (HINCRBY) for each completed query^[001-TODO__報表下載.md].
  3. Transition: Once all listeners for this stage have updated the counter (e.g., queryDoneCount >= 2), the process moves to the next stage^[001-TODO__報表下載.md].

Stage 2: Sub-queries or Further Processing

Subsequent stages may perform sub-queries or additional processing based on the data gathered in the previous stage.^[001-TODO__報表下載.md]

  • Data Ingestion: Listeners in this stage might query other tables (e.g., plt_account) and push their results to the same Redis structure^[001-TODO__報表下載.md].
  • Aggregation: Intermediate data might be organized or aggregated in Redis during these stages^[001-TODO__報表下載.md].

Stage 3: Finalization and File Generation

The final stage involves gathering the coordinated data from Redis to produce the output file.^[001-TODO__報表下載.md]

  1. Condition: A processing listener checks the Redis queryDoneCount to ensure all prerequisite data is ready^[001-TODO__報表下載.md].
  2. Action: The system aggregates the data from Redis, converts it to the target format (e.g., CSV), and uploads it to cloud storage^[001-TODO__報表下載.md].
  3. Completion: The FileDownloadRecordEntity is updated with the final status and file location^[001-TODO__報表下載.md].

Error Handling and Retry Logic

The workflow incorporates robust error handling through re-queue mechanisms.^[001-TODO__報表下載.md]

  • Retry Strategy: The message processing logic (e.g., processMessageOrRequeue) typically allows for a fixed number of re-queue attempts (default: 6 times) with a delay between attempts (e.g., 1 minute)^[001-TODO__報表下載.md].
  • Failure State: If the retries are exhausted without meeting the completion conditions (e.g., queryDoneCount never reaches the required threshold), the system updates the FileDownloadRecordEntity status to indicate failure^[001-TODO__報表下載.md].
  • [[Message Queue Patterns]]
  • [[Redis]]
  • [[Event-Driven Architecture]]
  • [[Google Cloud Platform]]

Sources

  • 001-TODO__報表下載.md