Skip to content

FanoutExchange Listener Pattern

The FanoutExchange Listener Pattern is a Message Queue processing strategy designed to handle complex data aggregation tasks, such as generating multi-project join reports.^[001-TODO__報表下載.md] This pattern utilizes a FanoutExchange mechanism to broadcast a message containing a task identifier (e.g., FileDownloadRecordEntity id) to multiple distinct listeners concurrently^[001-TODO__報表下載.md].

Each listener is responsible for querying a specific data source independently^[001-TODO__報表下載.md]. Instead of compiling the final report directly, listeners write their retrieved data into a shared, temporary storage layer—in this case, a Redis Hash—using a key derived from the task ID^[001-TODO__報表下載.md].

This architecture enables parallel processing of different database queries (e.g., plt_user vs. plt_fund tables), thereby reducing the overall time required to gather all necessary data points for a report^[001-TODO__報表下載.md].

Workflow and State Management

The pattern often operates within a larger workflow managed by additional message queues (MQ1, MQ3) that utilize a processMessageOrRequeue logic^[001-TODO__報表下載.md]. These supervisors check a query counter (redis-hash.queryDoneCount) to determine if all required listeners have finished their tasks before proceeding to subsequent steps, such as merging data or converting it to CSV^[001-TODO__報表下載.md].

To handle system failures, the workflow implements a retry mechanism—typically up to 6 times with a 1-minute delay—before marking the task as failed^[001-TODO__報表下載.md].

Data Storage and Concurrency

Data accumulation is handled via a Redis Hash structure with a defined Time-To-Live (TTL), typically set to 10 minutes^[001-TODO__報表下載.md].

  • Key Structure: FileDownloadRecordEntity-{id}^[001-TODO__報表下載.md]
  • Value Structure: A JSON-like object containing the queryDoneCount and arrays of data specific to each listener (e.g., vs_user_tag_relation, vs_withdraw)^[001-TODO__報表下載.md]
  • Increment Logic: As each listener finishes, it executes a HINCRBY command on the queryDoneCount^[001-TODO__報表下載.md]

Example Implementation

In a scenario involving multi-source data retrieval:

  1. Trigger: MQ2 sends a message with ID 1111 to a FanoutExchange^[001-TODO__報表下載.md].
  2. Fanout: The exchange routes the message to multiple listeners simultaneously^[001-TODO__報表下載.md].
  3. Execution:
    • Listener 1 queries plt_user.vs_user_tag_relation and pushes data to Redis^[001-TODO__報表下載.md].
    • Listener 2 queries plt_fund.vs_withdraw and pushes data to Redis^[001-TODO__報表下載.md].

This modular approach allows for the flexible addition or removal of data sources without altering the core reporting logic^[001-TODO__報表下載.md].

  • Message Queue
  • [[Event-Driven Architecture]]
  • [[Redis]]
  • [[Google Cloud Storage]]

Sources

  • 001-TODO__報表下載.md