Customize attachment processing
Use WorkerAdapter's streamAttachments function to handle attachment processing during the attachments extraction phase of an Airdrop import.
By default, it processes items from the attachments repo, which contains the URLs to stream the attachments.
Specify the batchSize parameter to control how many attachments are processed simultaneously.
The attachment processing logic resides in attachment-extraction.ts.
Default processing
When no custom processing logic is specified, the default processing logic is used.
const response = await adapter.streamAttachments({
stream: getAttachmentStream, // Function that fetches attachment content
batchSize: 10,
});Customize processing
Sometimes custom logic is needed for attachment processing. For example, when attachment URLs have short expiration times and would expire before the sync reaches the attachments extraction phase.
In such cases, you can provide custom processors to the streamAttachments function:
- reducer: Transforms attachments into the format you need (grouping, filtering, batching, etc.).
- iterator: Processes the transformed data and calls
adapter.processAttachment()for each attachment.
Processing type requirements
ExternalSystemAttachmentReducerFunction
export type ExternalSystemAttachmentReducerFunction<
Batch,
NewBatch,
ConnectorState,
> = ({
attachments,
adapter,
batchSize,
}: {
attachments: Batch;
adapter: WorkerAdapter<ConnectorState>;
batchSize?: number;
}) => NewBatch;ExternalSystemAttachmentIteratorFunction
export type ExternalSystemAttachmentIteratorFunction<
NewBatch,
ConnectorState
> = ({
reducedAttachments,
adapter,
stream,
}: {
reducedAttachments: NewBatch;
adapter: WorkerAdapter<ConnectorState>;
stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;// Example: Custom processors for refreshing expired URLs
import { processTask, ExtractorEventType } from '@devrev/ts-adaas';
// Custom reducer function - transforms the input attachments array
const reducer = ({ attachments, adapter }) => {
// Group attachments by their parent_id (ticket/task/etc.) to minimize API calls
// Instead of fetching each attachment individually, fetch the parent once
// to get all its attachments with fresh URLs
return attachments.reduce((parentMap, attachment) => {
const parentId = attachment.parent_id;
// Create a new array for this parent if it doesn't exist yet
if (!parentMap.has(parentId)) {
parentMap.set(parentId, []);
}
// Add this attachment to the parent's group
parentMap.get(parentId)!.push(attachment);
return parentMap;
}, new Map<string, NormalizedAttachment[]>());
};
// Custom iterator function - processes the reduced data
const iterator = async ({ reducedAttachments, adapter, stream }) => {
// reducedAttachments is now a Map where:
// - key: parent_id (e.g., ticket ID, issue ID)
// - value: array of attachments belonging to that parent
for (const [parentId, attachments] of reducedAttachments) {
// Fetch the parent item to get fresh attachment URLs
// Implement your custom logic here (API call, URL refresh, token renewal, etc.)
const freshAttachments = await refetchParentAttachments(parentId);
// Process each attachment in this parent group
for (const attachment of attachments) {
// Find the corresponding fresh attachment data
const freshAttachment = freshAttachments.find(fresh => fresh.id === attachment.id);
// Update the attachment with fresh data (URL, filename, etc.)
const updatedAttachment = freshAttachment
? { ...attachment, url: freshAttachment.url, file_name: freshAttachment.file_name }
: attachment; // fallback to original if not found
// Use the SDK's built-in attachment processing
const result = await adapter.processAttachment(updatedAttachment, stream);
// Handle rate limiting - return immediately to trigger retry
if (result?.delay) return { delay: result.delay };
// Log errors but continue processing other attachments
if (result?.error) {
console.warnFailed to process attachment ${attachment.id}:, result.error);
}
}
}
// Return undefined to signal successful completion
return;
};
// Example usage within a processTask worker
processTask({
task: async ({ adapter }) => {
try {
const response = await adapter.streamAttachments({
stream: getAttachmentStream, // Function that fetches attachment content
processors: {
reducer, // Transform attachments into groups
iterator // Process groups with custom logic
}
});
// ...
},
// ...
});Last updated on