Async Task Architecture

1. Introduction

Before reading this page, make sure you are familiar with the Process Api guide.

SuiteCRM 8 includes an Async Task system for executing long-running background jobs. It is built on Symfony Messenger and provides:

  • Three-phase batch processing — Queueing, Processing, and Finalizing.

  • Item-level progress tracking — real-time percentages, counts of completed/failed items.

  • Automatic item retry — failed items are retried up to a configurable limit.

  • Self-continuation — each batch dispatches the next, preventing long-running worker processes.

  • Configurable transport routing — route specific tasks to dedicated Messenger transports.

The system is used by Manual Migration Tasks, and in the future will be used by other long-running tasks/processes.

2. High-Level Architecture

The Async Task system is composed of five layers:

Architecture Layers

3. Core Components

3.1 Interfaces and Base Classes

Class / Interface Purpose

AsyncTaskHandlerInterface

Contract for task handlers. Defines the 3 phase methods plus retry/rerun policies. This is the primary interface you implement.

AbstractAsyncTaskHandler

Base class with sensible defaults (no finalization, 1 retry, no failure retry/rerun). Extend this for most handlers.

AsyncTaskRunnerInterface

Contract for runners. Each runner type (e.g. manual-migration-tasks, processes) has its own runner.

AsyncTaskRunner

Abstract base runner. Implements the three-phase loop, self-continuation, item management, and progress tracking. Concrete runners only override config keys and type.

AsyncTaskDispatcherInterface

Contract for dispatching async task messages. Centralises all message dispatch logic and routing.

AsyncTaskRouterInterface

Resolves which Messenger transport a task message should be routed to.

AsyncTaskBatchItemInterface

Value object representing one item to enqueue. Contains an item key, data payload, and optional sort order.

3.2 Registries

The system uses five tagged-interface registries for service discovery. All are populated automatically via Symfony’s autowiring and config/core_services.yaml:

Registry Tag Indexed By

AsyncTaskHandlerRegistry

app.async-task.handler

(type, handlerKey)

AsyncTaskRunnerRegistry

app.async-task.runner

type

AsyncTaskCompletedHandlerRegistry

app.async-task.completed.handler

(type, handlerKey)

AsyncTaskProgressedHandlerRegistry

app.async-task.progressed.handler

(type, handlerKey)

AsyncTaskFailureHandlerRegistry

app.async-task.failure.handler

(type, handlerKey)

The tagged interface definitions in config/core_services.yaml:

_instanceof:
  App\AsyncTask\Service\TaskHandler\AsyncTaskHandlerInterface:
    tags: [ 'app.async-task.handler' ]
  App\AsyncTask\Service\Runner\AsyncTaskRunnerInterface:
    tags: [ 'app.async-task.runner' ]
  App\AsyncTask\Service\TaskCompletedHandler\AsyncTaskCompletedHandlerInterface:
    tags: [ 'app.async-task.completed.handler' ]
  App\AsyncTask\Service\TaskProgressedHandler\AsyncTaskProgressedHandlerInterface:
    tags: [ 'app.async-task.progressed.handler' ]
  App\AsyncTask\Service\TaskFailureHandler\AsyncTaskFailureHandlerInterface:
    tags: [ 'app.async-task.failure.handler' ]

3.3 Messages

Four message types flow through Symfony Messenger:

Message Class Dispatched When Handled By

AsyncTaskRun

Task triggered or self-continuation

AsyncTaskRunMessageListener → Runner

AsyncTaskProgressed

After each processing batch

AsyncTaskProgressedMessageListener → Progressed handler

AsyncTaskCompleted

All or part of the items processed successfully

AsyncTaskCompletedMessageListener → Completed handler

AsyncTaskFailure

All items failed or unhandled exception

AsyncTaskFailureMessageListener → Failure handler

All messages carry the same core fields:

$taskId      // UUID of the task record
$taskType    // Runner type (e.g. 'manual-migration-tasks', 'processes')
$module      // Module name (for routing and record lookup)
$handlerKey  // Unique handler identifier (e.g. 'migrate-notes-files')
$data        // Custom task data from the Process options
$progress    // Phase/progress state (see section 5)

3.4 Message Listeners

Each message type has a dedicated listener:

Listener Behaviour

AsyncTaskRunMessageListener

Bound to internal-async transport via #[AsMessageHandler(fromTransport: 'internal-async')]. Resolves the runner from AsyncTaskRunnerRegistry by type and calls runner→run(). Initialises a legacy system session for CLI authentication.

AsyncTaskProgressedMessageListener

Resolves handler from AsyncTaskProgressedHandlerRegistry. Calls handler→onProgress() to update the task record with current progress.

AsyncTaskCompletedMessageListener

Resolves handler from AsyncTaskCompletedHandlerRegistry. Calls handler→onComplete() to set the task status to completed and clean up.

AsyncTaskFailureMessageListener

Has two handler methods: handleInternalFailure() (bound to failed transport, handles AsyncTaskRun messages that threw exceptions) and handleFailureMessage() (handles explicit AsyncTaskFailure messages). Both resolve a failure handler and update the task record.

3.5 Item Repository

AsyncTaskItemRepository (core/backend/AsyncTask/Service/Repository/AsyncTaskItemRepository.php) manages the async_task_items table using DBAL QueryBuilder. Key operations:

Method Purpose

addItemsToQueue()

Bulk inserts AsyncTaskBatchItem[] with status queued.

fetchBatch()

Fetches a batch of items by status (e.g. queued) with limit. JSON-decodes data and result_data.

updateItem()

Updates item status, result_data, and error_message.

countByStatus()

Returns counts grouped by status — used to calculate progress percentage.

retryFailedItems()

Re-queues failed items where retry_count < maxItemRetries. Used for automatic retry.

resetFailedItems()

Re-queues all failed items regardless of retry count. Used for user-initiated "Retry Failed".

purgeCompletedItems()

Hard-deletes all items except failed ones. Called on task completion to free space while preserving failure data.

purgeByTaskId()

Hard-deletes all items. Used for "Re-run" (start from scratch) and "Dismiss".

4. The Three-Phase Execution Model

The AsyncTaskRunner base class orchestrates task execution through three sequential phases. Each phase runs in batches, with a new Messenger message dispatched after each batch (the self-continuation pattern).

4.1 Phase 1: Queueing

Queueing phase flowchart

  • The handler’s getNextBatchToQueue() is called with $progress['enqueue_offset'] for pagination.

  • Returns an array of AsyncTaskBatchItem objects (or empty array when done).

  • Items are inserted into async_task_items with status queued.

  • The runner dispatches a new AsyncTaskRun message after each batch to continue queueing.

  • When getNextBatchToQueue() returns fewer items than $batchSize, the runner transitions to Processing.

4.2 Phase 2: Processing

Processing phase flowchart

  • Items are fetched in batches from async_task_items where status is queued.

  • Each item’s status is set to processing, then handler→processItem() is called.

  • The handler returns a Feedback object. On success, the item moves to completed. On failure, it is either re-queued (if retry count allows) or marked failed.

  • Progress is recalculated from DB counts after each batch: percent = (completed + failed + skipped) / total * 100.

  • An AsyncTaskProgressed message is dispatched to update the task record in the UI.

  • A new AsyncTaskRun message is dispatched to process the next batch.

  • When no more queued items remain, the runner checks handler→hasFinalization() to decide the next phase.

4.3 Phase 3: Finalizing (Optional)

Finalizing phase flowchart

  • Only runs if the handler returns true from hasFinalization().

  • Calls handler→finalize($task) once — used for post-processing work like merging output files or generating summaries.

  • After finalization, dispatches AsyncTaskCompleted and cleans up non-failed items.

4.4 Self-Continuation Pattern

The runner never processes all items in a single Messenger message. After each batch, it dispatches a new AsyncTaskRun message with the updated progress state. This pattern:

  • Prevents long-running worker processes (each message is short-lived).

  • Keeps memory usage bounded (each batch is independent).

  • Allows the Messenger worker to process other messages between batches.

  • Makes the system resilient — if a worker dies mid-batch, only the current item is affected.

Async Task Run Sequence

5. Progress Tracking

The progress state is a PHP array carried in every AsyncTaskRun message and persisted on the task record. It contains:

Key Description

phase

Current phase: 'queueing', 'processing', or 'finalizing'.

total

Total items enqueued so far.

queued

Items still waiting to be processed.

completed

Items successfully processed.

failed

Items that failed (after exhausting retries).

skipped

Items skipped by the handler.

percent

Overall progress: (completed + failed + skipped) / total * 100.

enqueue_offset

Pagination cursor for the queueing phase.

Progress is recalculated from the database after each processing batch using AsyncTaskItemRepository::countByStatus(). This ensures accuracy even if items were retried.

6. Error Handling and Retry

The system provides two levels of error recovery:

6.1 Automatic Item-Level Retry

When processItem() returns a failure (or throws an exception), the runner checks the item’s retry_count against handler→getMaxItemRetries():

  • If retry_count < maxItemRetries: the item is re-queued with an incremented retry_count. It will be picked up again in a subsequent processing batch.

  • If retry_count >= maxItemRetries: the item is marked as permanently failed.

The default from AbstractAsyncTaskHandler is getMaxItemRetries() = 1, meaning each item gets one automatic retry.

6.2 User-Initiated Recovery

After a task completes with failures, the administrator can trigger recovery actions (if the handler supports them):

Action Handler Method Behaviour

Retry Failed

allowsFailureRetry() = true

Calls itemRepository→resetFailedItems() — re-queues all failed items with retry_count=0. Dispatches new AsyncTaskRun to restart processing.

Re-run

allowsFailureRerun() = true

Calls itemRepository→purgeByTaskId() — deletes all items. Dispatches new AsyncTaskRun starting from the queueing phase.

6.3 Cleanup on Completion

When a task completes, the runner calls purgeCompletedItems() which hard-deletes all items except those with status failed. This frees database space while preserving failure data for administrator review.

When the administrator dismisses a task, purgeByTaskId() hard-deletes all remaining items (including failed ones) and the task record is soft-deleted.

7. Dispatch and Routing

7.1 AsyncTaskDispatcher

All async task messages must be dispatched through AsyncTaskDispatcherInterface — never directly via MessageBusInterface. The dispatcher:

  1. Constructs the appropriate message object.

  2. Consults AsyncTaskRouter for transport routing.

  3. Attaches a TransportNamesStamp if a specific transport is resolved.

  4. Dispatches the message to the Symfony Messenger bus.

7.2 AsyncTaskRouter

The router resolves which Messenger transport a message should be sent to, based on the module name and handler key. Resolution order:

  1. Environment variable (ASYNC_TASK_ROUTES JSON) — checked first.

  2. Config file (config/services/async-task/transports.yaml via %async-task.routes%) — fallback.

  3. Default Messenger routing — if no route is found, the message follows the standard Messenger routing rules (which sends all AsyncTaskRun messages to internal-async).

Within each source, the router checks:

  1. modules[<module>][<handlerKey>] — module-specific route (most specific).

  2. default[<handlerKey>] — handler-level default.

This allows administrators to route heavy tasks (e.g. PDF exports) to a dedicated transport with different worker resources.

8. Process API Integration

The Process API is the bridge between frontend action buttons and the async task system.

Process Processor flow

The ProcessHandler (action class) is the component you write to connect a frontend button to the async task system. Its configure() method sets:

  • setAsync(true) — tells ProcessProcessor to dispatch instead of calling run().

  • setAsyncHandlerKey(…​) — the task handler’s key.

  • setAsyncRunnerType(…​) — the runner type (e.g. 'manual-migration-tasks').

The run() method is left empty — async processes are executed by the Runner, not the ProcessHandler.

9. Database Schema

9.1 async_task_items

Stores individual items being processed. Shared across all async task types.

Column Type Description

id

char(36)

UUID primary key.

async_task_id

char(36)

Foreign key to the parent task record.

item_key

varchar(255)

Unique identifier within the task (typically the source record ID).

status

varchar(50)

'queued', 'processing', 'completed', 'failed', or 'skipped'.

data

longtext

JSON-encoded item data. Passed to processItem() as decoded array.

result_data

longtext

JSON result from the Feedback::data property.

error_message

text

Failure reason (first message from Feedback::messages).

retry_count

int

Number of automatic retries attempted.

sort_order

int

Processing order hint.

deleted

bool

Soft-delete flag.

9.2 Task Record Tables

Each runner type uses its own task record table (e.g. manual_migration_tasks, processes). These tables share common fields via the asynctask vardef template:

Column Type Description

id

char(36)

UUID primary key.

name

varchar(255)

Human-readable task name.

status

varchar(50)

'initial', 'pending', 'running', 'completed', or 'failed'.

service_key

varchar(255)

Maps to the handler’s getHandlerKey().

phase

varchar(50)

'queueing', 'processing', or 'finalizing'.

progress

text

JSON-encoded progress object (see section 5).

last_run_datetime

datetime

Timestamp of the most recent execution.

allow_failure_retry_action

bool

Enables the "Retry Failed" button.

allow_failure_rerun_action

bool

Enables the "Re-run" button.

10. Concrete Runner Types

The AsyncTaskRunner base class is abstract. Concrete runners only need to specify their type name and the system config keys for batch sizes:

Runner Type Class Batch Size Config Keys

manual-migration-tasks

AsyncManualMigrationRunner

max_migration_items_to_queue_per_run (default 50), max_migration_items_to_process_per_run (default 20)

processes

AsyncProcessRunner

max_processes_items_to_queue_per_run (default 50), max_processes_items_to_process_per_run (default 20)

Example — a concrete runner is minimal:

class AsyncManualMigrationRunner extends AsyncTaskRunner
{
    const TYPE = 'manual-migration-tasks';

    public function getType(): string
    {
        return self::TYPE;
    }

    protected function getMaxItemsToQueuePerRunConfigKey(): string
    {
        return 'max_migration_items_to_queue_per_run';
    }

    protected function getMaxItemsToProcessPerRunConfigKey(): string
    {
        return 'max_migration_items_to_process_per_run';
    }
}

11. The Feedback Model

App\Engine\Model\Feedback is the return type for both processItem() and finalize(). It carries the result of processing back to the runner.

Property Usage

success

true = item completed, false = item failed.

messages

Array of strings. On failure, the first message is stored as error_message on the item.

data

Associative array. Stored as JSON in the item’s result_data column. Use for output references (e.g. media object IDs).

All setters are chainable:

return (new Feedback())
    ->setSuccess(true)
    ->setMessages(['Migrated successfully'])
    ->setData(['media_object_id' => $id]);

12. The AsyncTaskHandlerInterface

This is the primary interface you implement when creating a new async task handler.

interface AsyncTaskHandlerInterface
{
    public function getHandlerKey(): string;
    public function getType(): string;

    /** @return AsyncTaskBatchItemInterface[] */
    public function getNextBatchToQueue(
        Record $task, array $progress, int $batchSize
    ): array;

    public function processItem(Record $task, array $item): Feedback;

    public function finalize(Record $task): Feedback;
    public function hasFinalization(): bool;

    public function getMaxItemRetries(): int;
    public function allowsFailureRetry(): bool;
    public function allowsFailureRerun(): bool;
}

12.1 Method Reference

Method Description

getHandlerKey()

Unique identifier for this handler (e.g. 'migrate-notes-files'). Stored in the task record’s service_key field.

getType()

Runner type (e.g. 'manual-migration-tasks'). Determines which AsyncTaskRunner subclass orchestrates execution.

getNextBatchToQueue()

Called repeatedly during the Queueing phase. Use $progress['enqueue_offset'] for pagination. Return AsyncTaskBatchItem[] or empty array when done.

processItem()

Called once per item during the Processing phase. Receives the item row (with decoded data). Returns Feedback.

finalize()

Called once during the Finalizing phase (only if hasFinalization() is true). Use for post-processing.

hasFinalization()

Return true if this handler needs a finalization phase.

getMaxItemRetries()

Max automatic retries per failed item. 0 = no retry. Default: 1.

allowsFailureRetry()

Whether administrators can trigger "Retry Failed" from the UI after the task completes with failures.

allowsFailureRerun()

Whether administrators can trigger "Re-run" from the UI after the task completes with failures.

12.2 The $item Array (processItem input)

The $item parameter passed to processItem() is a decoded row from async_task_items:

Key Description

id

The item’s UUID in the database.

item_key

The unique key you provided when creating the AsyncTaskBatchItem.

data

The associative array you provided as the AsyncTaskBatchItem data payload (JSON-decoded).

sort_order

The item’s position in the queue.

result_data

Any previously stored result data (populated on retry).

error_message

Previous error message (populated on retry).

13. File Locations

Component Path

Handler interface

core/backend/AsyncTask/Service/TaskHandler/AsyncTaskHandlerInterface.php

Handler base class

core/backend/AsyncTask/Service/TaskHandler/AbstractAsyncTaskHandler.php

Batch item

core/backend/AsyncTask/Service/TaskHandler/AsyncTaskBatchItem.php

Runner base class

core/backend/AsyncTask/Service/Runner/AsyncTaskRunner.php

Runner registry

core/backend/AsyncTask/Service/Runner/AsyncTaskRunnerRegistry.php

Dispatcher

core/backend/AsyncTask/Service/Dispatcher/AsyncTaskDispatcher.php

Router

core/backend/AsyncTask/Service/Router/AsyncTaskRouter.php

Item repository

core/backend/AsyncTask/Service/Repository/AsyncTaskItemRepository.php

Messages

core/backend/AsyncTask/Message/AsyncTask*.php

Listeners

core/backend/AsyncTask/Service/MessageListener/AsyncTask*MessageListener.php

Feedback model

core/backend/Engine/Model/Feedback.php

Process integration

core/backend/Process/DataPersister/ProcessProcessor.php

Manual migration runner

core/backend/ManualMigrations/AsyncTask/AsyncManualMigrationRunner.php

Service config

config/core_services.yaml

Transport routing config

config/services/async-task/transports.yaml

14. Next Steps

Content is available under GNU Free Documentation License 1.3 or later unless otherwise noted.