Development Guide

Customize attachment processing

Use WorkerAdapter's streamAttachments function to handle attachment processing during the attachments extraction phase of an Airdrop import. By default, it processes items from the attachments repo, which contains the URLs to stream the attachments. Specify the batchSize parameter to control how many attachments are processed simultaneously.

The attachment processing logic resides in attachment-extraction.ts.

Default processing

When no custom processing logic is specified, the default processing logic is used.

const response = await adapter.streamAttachments({
  stream: getAttachmentStream, // Function that fetches attachment content
  batchSize: 10,
});

Customize processing

Sometimes custom logic is needed for attachment processing. For example, when attachment URLs have short expiration times and would expire before the sync reaches the attachments extraction phase.

In such cases, you can provide custom processors to the streamAttachments function:

  • reducer: Transforms attachments into the format you need (grouping, filtering, batching, etc.).
  • iterator: Processes the transformed data and calls adapter.processAttachment() for each attachment.

Processing type requirements

ExternalSystemAttachmentReducerFunction
export type ExternalSystemAttachmentReducerFunction<
  Batch,
  NewBatch,
  ConnectorState,
> = ({
  attachments,
  adapter,
  batchSize,
}: {
  attachments: Batch;
  adapter: WorkerAdapter<ConnectorState>;
  batchSize?: number;
}) => NewBatch;
ExternalSystemAttachmentIteratorFunction
export type ExternalSystemAttachmentIteratorFunction<
  NewBatch,
  ConnectorState
> = ({
  reducedAttachments,
  adapter,
  stream,
}: {
  reducedAttachments: NewBatch;
  adapter: WorkerAdapter<ConnectorState>;
  stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;
// Example: Custom processors for refreshing expired URLs
import { processTask, ExtractorEventType } from '@devrev/ts-adaas';

// Custom reducer function - transforms the input attachments array
const reducer = ({ attachments, adapter }) => {
  // Group attachments by their parent_id (ticket/task/etc.) to minimize API calls
  // Instead of fetching each attachment individually, fetch the parent once
  // to get all its attachments with fresh URLs
  return attachments.reduce((parentMap, attachment) => {
    const parentId = attachment.parent_id;
    
    // Create a new array for this parent if it doesn't exist yet
    if (!parentMap.has(parentId)) {
      parentMap.set(parentId, []);
    }
    
    // Add this attachment to the parent's group
    parentMap.get(parentId)!.push(attachment);
    return parentMap;
  }, new Map<string, NormalizedAttachment[]>());
};

// Custom iterator function - processes the reduced data
const iterator = async ({ reducedAttachments, adapter, stream }) => {
  // reducedAttachments is now a Map where:
  // - key: parent_id (e.g., ticket ID, issue ID)
  // - value: array of attachments belonging to that parent
  
  for (const [parentId, attachments] of reducedAttachments) {
    // Fetch the parent item to get fresh attachment URLs
    // Implement your custom logic here (API call, URL refresh, token renewal, etc.)
    const freshAttachments = await refetchParentAttachments(parentId);
    
    // Process each attachment in this parent group
    for (const attachment of attachments) {
      // Find the corresponding fresh attachment data
      const freshAttachment = freshAttachments.find(fresh => fresh.id === attachment.id);
      
      // Update the attachment with fresh data (URL, filename, etc.)
      const updatedAttachment = freshAttachment 
        ? { ...attachment, url: freshAttachment.url, file_name: freshAttachment.file_name }
        : attachment; // fallback to original if not found
        
      // Use the SDK's built-in attachment processing
      const result = await adapter.processAttachment(updatedAttachment, stream);
      
      // Handle rate limiting - return immediately to trigger retry
      if (result?.delay) return { delay: result.delay };
      
      // Log errors but continue processing other attachments
      if (result?.error) {
        console.warnFailed to process attachment ${attachment.id}:, result.error);
      }
    }
  }
  
  // Return undefined to signal successful completion
  return;
};

// Example usage within a processTask worker
processTask({
  task: async ({ adapter }) => {
    try {
      const response = await adapter.streamAttachments({
        stream: getAttachmentStream, // Function that fetches attachment content
        processors: {
          reducer,  // Transform attachments into groups
          iterator  // Process groups with custom logic
        }
      });
      // ...
  },
  // ...
});

Last updated on