Multi-Stage Message Queue Workflow with Redis Coordination¶
The Multi-Stage Message Queue Workflow with Redis Coordination is a distributed system architecture designed to handle complex data retrieval and file generation tasks, such as cross-database report generation.^[001-TODO__報表下載.md] This pattern utilizes multiple Message Queue (MQ) stages to orchestrate asynchronous tasks while using Redis as a temporary state store and coordination mechanism to manage intermediate data and completion status.^[001-TODO__報表下載.md]
System Architecture¶
The workflow is typically implemented using a message broker (such as RabbitMQ) and Redis, often within a microservices environment like Spring Boot.^[001-TODO__報表下載.md] The system manages asynchronous requests where a user initiates a process, and the result is eventually stored in a cloud storage bucket (e.g., GCP Cloud Storage) for download.^[001-TODO__報表下載.md]
Persistence Layer¶
The workflow relies on a database entity, such as FileDownloadRecordEntity, to track the state of the job.^[001-TODO__報表下載.md] This record stores metadata including:
* Source and enumeration type of the report^[001-TODO__報表下載.md]
* Data retrieval criteria keys^[001-TODO__報表下載.md]
* File path/URL^[001-TODO__報表下載.md]
* Associated user/department IDs^[001-TODO__報表下載.md]
* Timestamps for creation and completion^[001-TODO__報表下載.md]
Coordination with Redis¶
Redis is used to store intermediate query results temporarily and coordinate the transition between workflow stages.^[001-TODO__報表下載.md] A Redis Hash structure is employed for each job, using a key pattern based on the record ID and table details (e.g., FileDownloadRecordEntity-{id}:{table}-{id}).^[001-TODO__報表下載.md]
The data structure typically includes:
* queryDoneCount: A counter incremented by listeners as they finish their specific sub-tasks^[001-TODO__報表下載.md].
* Result Lists: Data retrieved by specific listeners (e.g., plt_user.vs_user_tag_relation.user_id) stored as lists within the hash^[001-TODO__報表下載.md].
* TTL: A time-to-live (e.g., 10 minutes) is set to ensure the temporary data does not persist indefinitely^[001-TODO__報表下載.md].
Workflow Stages¶
The execution is divided into multiple sequential MQ stages, often involving a combination of direct processing and fan-out patterns to parallelize data retrieval.^[001-TODO__報表下載.md]
Stage 1: Parallel Data Retrieval¶
The initial stage often involves a Fanout Exchange to parallelize queries across different data sources or tables.^[001-TODO__報表下載.md]
- Trigger: A message containing the
FileDownloadRecordEntityID is sent to a Fanout Exchange (MQ2).^[001-TODO__報表下載.md] - Execution:
- Multiple listeners subscribe to the queue.
- Each listener is responsible for querying a specific table (e.g.,
plt_user,plt_fund). - Retrieved data is pushed into the corresponding Redis Hash.
- The
queryDoneCountin Redis is incremented (HINCRBY) for each completed query^[001-TODO__報表下載.md].
- Transition: Once all listeners for this stage have updated the counter (e.g.,
queryDoneCount >= 2), the process moves to the next stage^[001-TODO__報表下載.md].
Stage 2: Sub-queries or Further Processing¶
Subsequent stages may perform sub-queries or additional processing based on the data gathered in the previous stage.^[001-TODO__報表下載.md]
- Data Ingestion: Listeners in this stage might query other tables (e.g.,
plt_account) and push their results to the same Redis structure^[001-TODO__報表下載.md]. - Aggregation: Intermediate data might be organized or aggregated in Redis during these stages^[001-TODO__報表下載.md].
Stage 3: Finalization and File Generation¶
The final stage involves gathering the coordinated data from Redis to produce the output file.^[001-TODO__報表下載.md]
- Condition: A processing listener checks the Redis
queryDoneCountto ensure all prerequisite data is ready^[001-TODO__報表下載.md]. - Action: The system aggregates the data from Redis, converts it to the target format (e.g., CSV), and uploads it to cloud storage^[001-TODO__報表下載.md].
- Completion: The
FileDownloadRecordEntityis updated with the final status and file location^[001-TODO__報表下載.md].
Error Handling and Retry Logic¶
The workflow incorporates robust error handling through re-queue mechanisms.^[001-TODO__報表下載.md]
- Retry Strategy: The message processing logic (e.g.,
processMessageOrRequeue) typically allows for a fixed number of re-queue attempts (default: 6 times) with a delay between attempts (e.g., 1 minute)^[001-TODO__報表下載.md]. - Failure State: If the retries are exhausted without meeting the completion conditions (e.g.,
queryDoneCountnever reaches the required threshold), the system updates theFileDownloadRecordEntitystatus to indicate failure^[001-TODO__報表下載.md].
Related Concepts¶
- [[Message Queue Patterns]]
- [[Redis]]
- [[Event-Driven Architecture]]
- [[Google Cloud Platform]]
Sources¶
001-TODO__報表下載.md