Customize attachment processing

Use WorkerAdapter’s streamAttachments function to handle attachment processing during the attachments extraction phase of an Airdrop import. By default, it processes items from the attachments repo, which contains the URLs to stream the attachments. Specify the batchSize parameter to control how many attachments are processed simultaneously.

The attachment processing logic resides in attachment-extraction.ts.

Default processing

When no custom processing logic is specified, the default processing logic is used.

1const response = await adapter.streamAttachments({
2 stream: getAttachmentStream, // Function that fetches attachment content
3 batchSize: 10,
4});

Customize processing

Sometimes custom logic is needed for attachment processing. For example, when attachment URLs have short expiration times and would expire before the sync reaches the attachments extraction phase.

In such cases, you can provide custom processors to the streamAttachments function:

  • reducer: Transforms attachments into the format you need (grouping, filtering, batching, etc.).
  • iterator: Processes the transformed data and calls adapter.processAttachment() for each attachment.

Processing type requirements

ExternalSystemAttachmentReducerFunction
1export type ExternalSystemAttachmentReducerFunction<
2 Batch,
3 NewBatch,
4 ConnectorState,
5> = ({
6 attachments,
7 adapter,
8 batchSize,
9}: {
10 attachments: Batch;
11 adapter: WorkerAdapter<ConnectorState>;
12 batchSize?: number;
13}) => NewBatch;
ExternalSystemAttachmentIteratorFunction
1export type ExternalSystemAttachmentIteratorFunction<
2 NewBatch,
3 ConnectorState
4> = ({
5 reducedAttachments,
6 adapter,
7 stream,
8}: {
9 reducedAttachments: NewBatch;
10 adapter: WorkerAdapter<ConnectorState>;
11 stream: ExternalSystemAttachmentStreamingFunction;
12}) => Promise<ProcessAttachmentReturnType>;
Implementation example
1// Example: Custom processors for refreshing expired URLs
2import { processTask, ExtractorEventType } from '@devrev/ts-adaas';
3
4// Custom reducer function - transforms the input attachments array
5const reducer = ({ attachments, adapter }) => {
6 // Group attachments by their parent_id (ticket/task/etc.) to minimize API calls
7 // Instead of fetching each attachment individually, fetch the parent once
8 // to get all its attachments with fresh URLs
9 return attachments.reduce((parentMap, attachment) => {
10 const parentId = attachment.parent_id;
11
12 // Create a new array for this parent if it doesn't exist yet
13 if (!parentMap.has(parentId)) {
14 parentMap.set(parentId, []);
15 }
16
17 // Add this attachment to the parent's group
18 parentMap.get(parentId)!.push(attachment);
19 return parentMap;
20 }, new Map<string, NormalizedAttachment[]>());
21};
22
23// Custom iterator function - processes the reduced data
24const iterator = async ({ reducedAttachments, adapter, stream }) => {
25 // reducedAttachments is now a Map where:
26 // - key: parent_id (e.g., ticket ID, issue ID)
27 // - value: array of attachments belonging to that parent
28
29 for (const [parentId, attachments] of reducedAttachments) {
30 // Fetch the parent item to get fresh attachment URLs
31 // Implement your custom logic here (API call, URL refresh, token renewal, etc.)
32 const freshAttachments = await refetchParentAttachments(parentId);
33
34 // Process each attachment in this parent group
35 for (const attachment of attachments) {
36 // Find the corresponding fresh attachment data
37 const freshAttachment = freshAttachments.find(fresh => fresh.id === attachment.id);
38
39 // Update the attachment with fresh data (URL, filename, etc.)
40 const updatedAttachment = freshAttachment
41 ? { ...attachment, url: freshAttachment.url, file_name: freshAttachment.file_name }
42 : attachment; // fallback to original if not found
43
44 // Use the SDK's built-in attachment processing
45 const result = await adapter.processAttachment(updatedAttachment, stream);
46
47 // Handle rate limiting - return immediately to trigger retry
48 if (result?.delay) return { delay: result.delay };
49
50 // Log errors but continue processing other attachments
51 if (result?.error) {
52 console.warnFailed to process attachment ${attachment.id}:, result.error);
53 }
54 }
55 }
56
57 // Return undefined to signal successful completion
58 return;
59};
60
61// Example usage within a processTask worker
62processTask({
63 task: async ({ adapter }) => {
64 try {
65 const response = await adapter.streamAttachments({
66 stream: getAttachmentStream, // Function that fetches attachment content
67 processors: {
68 reducer, // Transform attachments into groups
69 iterator // Process groups with custom logic
70 }
71 });
72 // ...
73 },
74 // ...
75});