Deploy extraction strategies to Python transforms

After validating your extraction strategy in AIP Document Intelligence, you can deploy it as a Python transform to run batch extraction across all pages of all documents in a media set. The deployed template produces the same results as the corresponding configuration in AIP Document Intelligence.

Using the deployed template

Once the template is created in Code Repositories:

  1. Specify your output dataset in the @transform.using decorator in the src/myproject/document_extraction/my_extraction.py file.
  2. Trigger the build.

The template uses lightweight transforms for optimal performance. Legacy versions used Spark-based transforms, which are much slower due to Spark overhead. We recommend migrating to lightweight transforms if you have not already.

Preview mode is not yet supported in this template. Errors are expected when using preview, but the actual build will work correctly.

Incremental processing

By default, the document extraction transform is non-incremental, meaning that all documents are processed on every run. You can configure the transform to run incrementally by uncommenting the @incremental(...) decorator line. For an incremental transform, when new documents are added to the input media set, re-running the transform will only process the new documents and append results to the output dataset.

Customizing the prompt

For generative AI configurations, the template inherits the prompt you specified in AIP Document Intelligence. You can view the prompts in src/myproject/document_extraction/prompts.py.

We do not recommend editing prompts directly in the template, as this causes discrepancies between Document Intelligence results and batch job results. Instead, make prompt adjustments in Document Intelligence, verify the results there, then redeploy to create a new template.

Transform input typeCustomizable prompt
VisionLLMDocumentsExtractorInputUser prompt only (system prompt is fixed)
VisionLLMLayoutDocumentsExtractorInput (layout-aware extraction)System prompt only (user prompt is fixed)

For layout-aware extraction configurations, the user prompt must remain fixed because it contains a special JSON schema that preserves layout structure information. Modifying this prompt will significantly reduce extraction success rates.

Custom image preprocessing

For documents that require image transformations before extraction, such as rotated text content, you should:

  1. Create a separate transform pipeline to apply the image transformations.
  2. Save the processed results to a new media set.
  3. Use Document Intelligence on the processed media set for extraction.

Run on a subset of media items

For layout-aware generative AI configurations, the vision LLM must generate valid JSON adhering to a specific schema. If the response is invalid JSON or does not follow the schema, the extraction fails with an ERROR_RESPONSE_JSON_PARSING error.

In practice, approximately 5% of extractions may fail with top-tier models. For failed rows, you still get valid layoutInfo with extraction results from the layout model only.

To re-run extraction on failed rows:

  1. Use the filter_on_media_items argument with a list of media item IDs to process only specific items.
  2. Remove the @incremental decorator so these rows are re-processed instead of being detected as finished.

Improve runtime performance

The THREAD_NUMBER parameter controls concurrent threads, where each thread extracts data from one document page at a time. Higher values result in faster job completion.

SettingValueNotes
Default20Conservative setting suitable for most environments
Maximum tested300Achievable in development environments with abundant Vision LLM capacity

Setting the THREAD_NUMBER value too high in capacity-restricted environments will cause rate limit errors. The retry loop then consumes significant capacity, affecting other jobs using the same model. You should monitor usage when adjusting this parameter.

Template examples

The following examples show the transform code generated for each extraction configuration. These are for reference only. You should create the transform using the deploy tool in Document Intelligence instead of manually writing transform code for document extraction.

Traditional extraction: Raw text

Extracts text by reading document metadata. Only available for electronically generated PDFs.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with raw text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_raw( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Traditional extraction: OCR

Uses traditional Optical Character Recognition (OCR) to extract text without preserving layout information.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with OCR text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_ocr_output_text( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Traditional extraction: Layout-aware OCR

Uses advanced OCR with bounding boxes to preserve document layout and structure.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with layout-aware OCR extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_media_item(media_item_rid, str(page_num), { "type": "documentToText", "documentToText": { "operation": { "type": "extractLayoutAwareContent", "extractLayoutAwareContent": { "parameters": { "languages": ["ENG"] } } } } }) extraction_result = str(extraction_result.json()) return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Generative AI extraction: Basic

Uses a vision language model to extract content as Markdown without preprocessing.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=False, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: With OCR preprocessing

Uses a vision language model with OCR preprocessing for improved extraction on complex documents.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=True, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: Layout-aware

Uses a vision language model with layout-aware OCR preprocessing, returning layout information alongside extracted content.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="no_overlay", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: Layout-aware with table cropping

Uses a vision language model with layout-aware OCR preprocessing and table cropping for improved table extraction accuracy.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="crop_tables", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)