FanoutExchange Data Aggregation¶
FanoutExchange Data Aggregation is a distributed processing pattern designed to handle complex, multi-project data joins and report generation within a microservices architecture^[001-todo.md]. It leverages message queues to decompose a heavy query task into parallel sub-queries managed by independent listeners, utilizing temporary storage to aggregate partial results^[001-todo.md].
Architecture and Workflow¶
The system is structured as a multi-stage pipeline where data is fetched, aggregated, and processed asynchronously^[001-todo.md].
Stage 1: Parallel Data Retrieval (MQ1 & MQ2)¶
The process begins with a Message Queue worker (MQ1) that utilizes a processMessageOrRequeue mechanism^[001-todo.md]. This worker listens for a trigger, represented by a message containing the ID of a FileDownloadRecordEntity^[001-todo.md].
Once triggered, the message is routed to a FanoutExchange (MQ2)^[001-todo.md]. This exchange broadcasts the message to multiple listeners, each responsible for querying a specific data source^[001-todo.md].
- Listener 1: Queries
plt_user.vs_user_tag_relation. - Listener 2: Queries
plt_fund.vs_withdraw.
These listeners execute their queries and place the resulting data into a structured Redis Hash^[001-todo.md].
Stage 2: Intermediate Data Aggregation (MQ3)¶
After the initial data retrieval, a second queue worker (MQ3) processes the intermediate data^[001-todo.md]. It checks the queryDoneCount within the Redis Hash; if the count meets the threshold (e.g., \(\ge 2\)), it proceeds to the next step^[001-todo.md]. This stage handles operations such as transforming data into CSV format or uploading it to Google Cloud Platform (GCP) Cloud Storage^[001-todo.md].
Stage 3: Secondary Enrichment (MQ4)¶
In some workflows, an additional FanoutExchange (MQ4) is required to enrich the data further^[001-todo.md]. Similar to Stage 1, it dispatches listeners to fetch supplementary data (e.g., plt_fund.vs_payment), writing the new results back to the same Redis Hash structure^[001-todo.md].
Data Storage Structure¶
The aggregation state and temporary data are maintained in Redis using a specific Hash structure^[001-todo.md].
- Key:
FileDownloadRecordEntity-{ID}(e.g.,FileDownloadRecordEntity-1111). - TTL: 10 minutes^[001-todo.md].
- Fields:
queryDoneCount: A counter incremented by each listener (hincrby) to track completion status^[001-todo.md].- Dynamic keys for each listener's dataset (e.g.,
plt_user.vs_user_tag_relation.user_idcontaining the list of results).
Error Handling and Reliability¶
The workflow relies on a robust retry mechanism to handle transient failures^[001-todo.md].
- Retry Strategy: The
processMessageOrRequeuefunction is configured with a default of 6 retry attempts and a 1-minute delay between attempts^[001-todo.md]. - Failure State: If all retries are exhausted, the system automatically updates the
statusof theFileDownloadRecordEntityto indicate failure^[001-toto.md].
Related Concepts¶
- [[FileDownloadRecordEntity]]
- [[Redis]]
- [[Google Cloud Platform]]
Sources¶
^[001-todo.md]