Skip to content

Multi-Query Coordination with Redis Hash

Multi-Query Coordination with Redis Hash is a distributed computing pattern used to manage complex, asynchronous data retrieval tasks across multiple data sources or projects. It utilizes Redis Hash structures to temporarily store intermediate results and coordinate the execution status of parallel sub-queries before aggregating the final output.^[001-todo.md]

This pattern is commonly employed in scenarios such as report generation systems where data must be fetched from distinct database tables (e.g., plt_user, plt_fund, plt_account) and joined or aggregated asynchronously^[001-todo.md].

Core Mechanism

The coordination mechanism relies on a Redis Hash entry with a specific Time-To-Live (TTL) to track the progress of distributed tasks^[001-todo.md].

  • Key Structure: The Redis key typically follows the format FileDownloadRecordEntity-{ID}, where the ID corresponds to the specific task or record identifier^[001-todo.md].
  • Data Storage: The hash value stores the results of various queries. For example, it may contain arrays of user IDs or withdrawal records mapped by the listener name (e.g., plt_user.vs_user_tag_relation.user_id)^[001-todo.md].
  • Counter: A critical field within the hash is queryDoneCount. This counter acts as a barrier, incrementing via HINCRBY each time a listener successfully completes its data retrieval and stores it in the hash^[001-todo.md].
  • Lifecycle: The Redis hash has a default TTL of 10 minutes, ensuring that stale coordination data is automatically cleaned up if the process fails or times out^[001-too.md].

Workflow

The coordination process typically involves a series of Message Queue (MQ) listeners and processors working in stages:

  1. Fanout and Parallel Query: A message (e.g., containing a FileDownloadRecordEntity ID) is broadcast to a Fanout Exchange. Multiple listeners pick up this message simultaneously^[001-todo.md].
  2. Data Storage: Each listener executes a specific database query (sub-query) and writes the resulting data to the shared Redis Hash. Upon completion, they increment the queryDoneCount^[001-todo.md].
  3. Polling and Verification: A polling process (e.g., processMessageOrRequeue) checks the Redis Hash. It evaluates whether the queryDoneCount has reached a required threshold (e.g., >= 2 or >= 1)^[001-todo.md].
  4. Aggregation or Retry:
    • If the count meets the threshold, the system proceeds to the next step, such as merging the data or converting it to a CSV for upload to cloud storage^[001-todo.md].
    • If the threshold is not met, the message may be requeued. A common strategy involves allowing a finite number of requeues (e.g., 6 times with a 1-minute delay) before marking the task as failed^[001-todo.md].

Error Handling and Reliability

To handle transient failures or slow sub-queries, the system implements a retry mechanism with a limit^[001-todo.md].

  • Requeue Logic: If the queryDoneCount check fails, the message is requeued (default: 6 times, 1-minute delay)^[001-todo.md].
  • Failure State: If the retry limit is exhausted without the queryDoneCount reaching the required threshold, the system updates the status of the entity (e.g., FileDownloadRecordEntity.status) to indicate failure^[001-todo.md].
  • Message Queue
  • [[Redis]]
  • [[Asynchronous Processing]]
  • [[Distributed Transactions]]
  • [[Report Generation System]]

Sources

  • 001-todo.md