Documentation Index Fetch the complete documentation index at: https://documentation.datalab.to/llms.txt
Use this file to discover all available pages before exploring further.
Process directories of documents with the SDK or CLI. Both handle rate limiting and retries automatically.
SDK Batch Processing
Process multiple files using Python’s async capabilities:
Async Batch Processing
For higher throughput:
import asyncio
from pathlib import Path
from datalab_sdk import AsyncDatalabClient, ConvertOptions
async def process_directory ( input_dir : str , output_dir : str ):
async with AsyncDatalabClient() as client:
pdf_files = list (Path(input_dir).glob( "*.pdf" ))
# Process all files concurrently
tasks = [
client.convert( str (pdf), options = ConvertOptions( mode = "balanced" ))
for pdf in pdf_files
]
results = await asyncio.gather( * tasks, return_exceptions = True )
for pdf, result in zip (pdf_files, results):
if isinstance (result, Exception ):
print ( f "Error processing { pdf.name } : { result } " )
else :
output_path = Path(output_dir) / f " { pdf.stem } .md"
output_path.write_text(result.markdown)
print ( f "Saved: { output_path } " )
asyncio.run(process_directory( "./documents/" , "./output/" ))
CLI Batch Processing
The CLI handles directory processing automatically:
# Convert all PDFs in a directory
datalab convert ./documents/ --output_dir ./output/
# Filter by extension
datalab convert ./documents/ --extensions pdf,docx
# Control concurrency
datalab convert ./documents/ --max_concurrent 10
# With processing options
datalab convert ./documents/ \
--mode balanced \
--format markdown \
--output_dir ./output/
See CLI Reference for all options.
REST API Batch Processing
For raw API usage, implement parallel requests with retry handling:
import os
import time
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter, Retry
API_URL = "https://www.datalab.to/api/v1/convert"
API_KEY = os.getenv( "DATALAB_API_KEY" )
# Configure session with retries
session = requests.Session()
retries = Retry(
total = 20 ,
backoff_factor = 4 ,
status_forcelist = [ 429 ],
allowed_methods = [ "GET" , "POST" ],
raise_on_status = False ,
)
session.mount( "https://" , HTTPAdapter( max_retries = retries))
def convert_document ( pdf_path : Path, output_format = "markdown" , mode = "balanced" ):
"""Convert a single document with polling."""
headers = { "X-API-Key" : API_KEY }
# Submit request
with open (pdf_path, "rb" ) as f:
response = session.post(
API_URL ,
files = { "file" : (pdf_path.name, f, "application/pdf" )},
data = { "output_format" : output_format, "mode" : mode},
headers = headers
)
data = response.json()
check_url = data[ "request_check_url" ]
# Poll for completion
for _ in range ( 300 ):
result = session.get(check_url, headers = headers).json()
if result[ "status" ] == "complete" :
return result
elif result[ "status" ] == "failed" :
raise Exception ( f "Failed: { result.get( 'error' ) } " )
time.sleep( 2 )
raise Exception ( "Timeout" )
def batch_convert ( directory : str , max_workers : int = 5 ):
"""Process all PDFs in a directory."""
doc_dir = Path(directory)
pdfs = list (doc_dir.glob( "*.pdf" ))
print ( f "Found { len (pdfs) } PDFs" )
results = {}
with ThreadPoolExecutor( max_workers = max_workers) as executor:
futures = {
executor.submit(convert_document, pdf): pdf.name
for pdf in pdfs
}
for future in as_completed(futures):
filename = futures[future]
try :
result = future.result()
results[filename] = result
print ( f "Converted: { filename } " )
except Exception as e:
print ( f "Error processing { filename } : { e } " )
return results
# Usage
results = batch_convert( "./documents/" , max_workers = 5 )
Rate Limits
Request rate limit: 200 requests per minute per account (429 on exceed)
Concurrent request limit: 400 concurrent requests (429 on exceed)
Page concurrency limit: 5,000 pages in flight across all requests — this is enforced during processing, not at submission. Results return with success: false if exceeded. Always check the success field when polling for results.
The SDK and CLI handle request rate limiting and retries automatically
For higher limits, contact support@datalab.to
See API Limits for details.
Tips
Use async for high throughput - Async processing handles many concurrent requests efficiently
Limit concurrency - Start with 5-10 concurrent requests and adjust based on your rate limits
Handle failures gracefully - Use return_exceptions=True with asyncio.gather to continue processing on errors
Save progress - Write results incrementally to avoid losing work on long batches
Next Steps
Document Conversion Learn more about Marker’s conversion API and output formats.
API Limits Understand rate limits and how to optimize throughput.
Pipelines Chain processors into versioned, reusable pipelines.
Webhooks Get notified when batch conversions complete via webhooks.