⏱️ Estimated reading time: 18 min

Introduction

In modern AI systems, the performance of RAG (Retrieval-Augmented Generation) and LLM-based applications depends heavily on the quality of input data. Extracting meaningful information from complex-structured documents such as PDFs, PowerPoint, and Word files, and chunking that information appropriately, remains a significant challenge.

Traditional plain-text extraction approaches often lose the structural information embedded in document layouts, tables, images, and charts, making it difficult for LLMs to understand context properly. Chunkr was built to address these problems: it is a vision-based document intelligence API platform that accurately captures document structure and semantics to generate data optimized for RAG systems.

This post covers Chunkr’s core features, how to implement it, and a practical guide to extracting high-quality data from a variety of document formats.

Chunkr Core Technologies

1. Advanced Layout Analysis

Layout analysis is one of Chunkr’s most powerful features, accurately identifying and classifying the structural elements of a document.

Supported Layout Elements

  • Text blocks: distinctions between headings, body text, and captions
  • Table structure: data extraction that preserves row/column relationships
  • Images and charts: understanding of position and caption relationships
  • List structure: hierarchical structure of ordered and unordered lists
  • Multi-column layouts: handling of newspaper/magazine-style multi-column documents

Layout Analysis Example

# Layout analysis result structure
layout_elements = {
    "headers": [
        {"text": "Chapter 1: Introduction", "level": 1, "bbox": [50, 100, 500, 130]},
        {"text": "1.1 Overview", "level": 2, "bbox": [50, 150, 300, 180]}
    ],
    "paragraphs": [
        {"text": "This document explains...", "bbox": [50, 200, 500, 280]}
    ],
    "tables": [
        {
            "bbox": [50, 300, 500, 450],
            "rows": 5,
            "columns": 3,
            "data": [["Name", "Age", "City"], ["John", "25", "Seoul"]]
        }
    ],
    "images": [
        {"bbox": [50, 500, 300, 700], "caption": "Figure 1: System Architecture"}
    ]
}

2. Precise OCR and Bounding Boxes

The OCR (Optical Character Recognition) feature goes beyond simple text extraction to provide exact positional information for each character.

OCR Highlights

  • High accuracy: text recognition based on state-of-the-art vision models
  • Multilingual support: Korean, English, Chinese, Japanese, and more
  • Bounding boxes: precise coordinate information for each piece of text
  • Font metadata: size, style, color, and other typographic details
  • Rotated text: accurate recognition of tilted text

3. Semantic Chunking

Semantic chunking is an intelligent splitting approach that considers semantic relevance rather than simple character or word counts.

Chunking Strategies

# Semantic chunking configuration example
chunking_config = {
    "strategy": "semantic",
    "max_chunk_size": 1000,  # maximum token count
    "overlap": 200,          # overlap between chunks
    "preserve_structure": True,  # preserve structure
    "semantic_threshold": 0.7,   # semantic similarity threshold
    "respect_boundaries": [
        "paragraph", "section", "table", "list"
    ]
}

Installation and Basic Setup

1. Cloud API Approach

The simplest way to get started is to use the cloud service at chunkr.ai.

# Install the Python SDK
pip install chunkr-ai

# Basic usage
from chunkr_ai import Chunkr

# Initialize with your API key
chunkr = Chunkr(api_key="your_api_key_from_chunkr_ai")

# Upload and process a document
url = "https://example.com/document.pdf"
task = chunkr.upload(url)

# Extract results in multiple formats
html = task.html(output_file="output.html")
markdown = task.markdown(output_file="output.md")
content = task.content(output_file="output.txt")
json_data = task.json(output_file="output.json")

# Clean up resources
chunkr.close()

2. Docker-Based Self-Hosting

If privacy or customization is a priority, you can choose self-hosting.

# Clone the project
git clone https://github.com/lumina-ai-inc/chunkr
cd chunkr

# Configure environment
cp .env.example .env
cp models.example.yaml models.yaml

# Run in a GPU environment
docker compose up -d

# Run in a CPU-only environment
docker compose -f compose.yaml -f compose.cpu.yaml up -d

# macOS ARM (M1/M2/M3) environment
docker compose -f compose.yaml -f compose.cpu.yaml -f compose.mac.yaml up -d

3. Service Endpoints by Environment

# Service access addresses
Web UI: http://localhost:5173
API: http://localhost:8000
API Documentation: http://localhost:8000/docs
Health Check: http://localhost:8000/health

Advanced Document Processing Features

1. Support for Multiple Document Formats

Chunkr supports a wide range of document formats, providing processing optimized for each format.

# Document format-specific processing examples
from chunkr_ai import Chunkr

class DocumentProcessor:
    def __init__(self, api_key):
        self.chunkr = Chunkr(api_key=api_key)
    
    def process_document(self, file_path, doc_type="auto"):
        """Format-optimized document processing"""
        
        # Format-specific configuration
        processing_configs = {
            "pdf": {
                "ocr_strategy": "auto",  # auto-detect text/scanned documents
                "preserve_layout": True,
                "extract_images": True,
                "table_detection": True
            },
            "pptx": {
                "slide_separation": True,  # separate by slide
                "preserve_animations": False,
                "extract_speaker_notes": True,
                "image_extraction": True
            },
            "docx": {
                "preserve_styles": True,   # preserve style information
                "extract_headers_footers": True,
                "table_structure": True,
                "comment_extraction": False
            },
            "xlsx": {
                "sheet_separation": True,  # separate by sheet
                "preserve_formulas": True,
                "data_validation": True,
                "chart_extraction": True
            }
        }
        
        # Upload and process document
        task = self.chunkr.upload(
            file_path,
            config=processing_configs.get(doc_type, {})
        )
        
        return task
    
    def extract_structured_data(self, task):
        """Extract structured data"""
        
        # Get detailed structure information in JSON format
        json_result = task.json()
        
        # Separate data by structure
        structured_data = {
            "metadata": json_result.get("metadata", {}),
            "pages": [],
            "tables": [],
            "images": [],
            "text_blocks": []
        }
        
        for page in json_result.get("pages", []):
            page_data = {
                "page_number": page.get("page_number"),
                "dimensions": page.get("dimensions"),
                "elements": []
            }
            
            # Classify elements within the page
            for element in page.get("elements", []):
                element_type = element.get("type")
                
                if element_type == "table":
                    structured_data["tables"].append({
                        "page": page.get("page_number"),
                        "bbox": element.get("bbox"),
                        "data": element.get("data"),
                        "headers": element.get("headers")
                    })
                
                elif element_type == "image":
                    structured_data["images"].append({
                        "page": page.get("page_number"),
                        "bbox": element.get("bbox"),
                        "caption": element.get("caption"),
                        "alt_text": element.get("alt_text")
                    })
                
                elif element_type in ["paragraph", "header", "footer"]:
                    structured_data["text_blocks"].append({
                        "page": page.get("page_number"),
                        "type": element_type,
                        "text": element.get("text"),
                        "bbox": element.get("bbox"),
                        "style": element.get("style", {})
                    })
                
                page_data["elements"].append(element)
            
            structured_data["pages"].append(page_data)
        
        return structured_data

# Usage example
processor = DocumentProcessor("your_api_key")

# Process a PDF document
pdf_task = processor.process_document("report.pdf", "pdf")
pdf_data = processor.extract_structured_data(pdf_task)

# Process a PowerPoint document
ppt_task = processor.process_document("presentation.pptx", "pptx")
ppt_data = processor.extract_structured_data(ppt_task)

2. RAG-Optimized Chunking Strategies

Advanced chunking strategies can be implemented to maximize RAG system performance.

# RAG-optimized chunking implementation
class RAGOptimizedChunker:
    def __init__(self, chunkr_instance):
        self.chunkr = chunkr_instance
    
    def create_rag_chunks(self, task, chunk_strategy="adaptive"):
        """Create chunks optimized for RAG systems"""
        
        # Extract structured data
        structured_data = task.json()
        
        chunks = []
        
        if chunk_strategy == "adaptive":
            chunks = self._adaptive_chunking(structured_data)
        elif chunk_strategy == "semantic":
            chunks = self._semantic_chunking(structured_data)
        elif chunk_strategy == "hierarchical":
            chunks = self._hierarchical_chunking(structured_data)
        
        return chunks
    
    def _adaptive_chunking(self, data):
        """Adaptive chunking: dynamically adjust by content type"""
        
        chunks = []
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                element_type = element.get("type")
                
                if element_type == "table":
                    # Tables become a single chunk
                    chunks.append({
                        "type": "table",
                        "content": self._format_table(element),
                        "metadata": {
                            "page": page.get("page_number"),
                            "bbox": element.get("bbox"),
                            "element_type": "table"
                        }
                    })
                
                elif element_type == "header":
                    # Chunk headers together with following paragraphs
                    header_chunk = self._create_header_chunk(element, page)
                    chunks.append(header_chunk)
                
                elif element_type == "paragraph":
                    # Split or merge based on paragraph length
                    para_chunks = self._split_paragraph(element)
                    chunks.extend(para_chunks)
        
        return chunks
    
    def _semantic_chunking(self, data):
        """Semantic chunking: group by topic"""
        
        # Extract full text
        full_text = self._extract_full_text(data)
        
        # Compute semantic similarity based on embeddings
        semantic_chunks = self._compute_semantic_boundaries(full_text)
        
        return semantic_chunks
    
    def _hierarchical_chunking(self, data):
        """Hierarchical chunking: based on document structure"""
        
        chunks = []
        current_section = None
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") == "header":
                    level = element.get("style", {}).get("level", 1)
                    
                    if level == 1:
                        # Start a new section
                        if current_section:
                            chunks.append(current_section)
                        
                        current_section = {
                            "type": "section",
                            "title": element.get("text"),
                            "content": [],
                            "subsections": []
                        }
                    
                    elif level == 2 and current_section:
                        # Add a subsection
                        current_section["subsections"].append({
                            "title": element.get("text"),
                            "content": []
                        })
                
                else:
                    # Add content to the current section
                    if current_section:
                        if current_section["subsections"]:
                            current_section["subsections"][-1]["content"].append(element)
                        else:
                            current_section["content"].append(element)
        
        # Append the last section
        if current_section:
            chunks.append(current_section)
        
        return chunks
    
    def _format_table(self, table_element):
        """Convert table data to a RAG-friendly format"""
        
        data = table_element.get("data", [])
        if not data:
            return ""
        
        # Separate headers and data
        headers = data[0] if data else []
        rows = data[1:] if len(data) > 1 else []
        
        # Convert to Markdown table format
        markdown_table = "| " + " | ".join(headers) + " |\n"
        markdown_table += "| " + " | ".join(["---"] * len(headers)) + " |\n"
        
        for row in rows:
            markdown_table += "| " + " | ".join(str(cell) for cell in row) + " |\n"
        
        return markdown_table
    
    def optimize_for_retrieval(self, chunks, max_tokens=500):
        """Post-process chunks for retrieval optimization"""
        
        optimized_chunks = []
        
        for chunk in chunks:
            # Check and adjust token count
            if self._count_tokens(chunk.get("content", "")) > max_tokens:
                # Split large chunks
                sub_chunks = self._split_large_chunk(chunk, max_tokens)
                optimized_chunks.extend(sub_chunks)
            else:
                optimized_chunks.append(chunk)
        
        # Enrich metadata
        for i, chunk in enumerate(optimized_chunks):
            chunk["metadata"]["chunk_id"] = i
            chunk["metadata"]["total_chunks"] = len(optimized_chunks)
            
            # Extract search keywords
            chunk["metadata"]["keywords"] = self._extract_keywords(
                chunk.get("content", "")
            )
        
        return optimized_chunks

# Usage example
chunker = RAGOptimizedChunker(chunkr)

# Process document and create RAG-optimized chunks
task = chunkr.upload("complex_document.pdf")
rag_chunks = chunker.create_rag_chunks(task, "adaptive")

# Retrieval optimization
optimized_chunks = chunker.optimize_for_retrieval(rag_chunks, max_tokens=400)

LLM Integration and Configuration

1. Configuring Various LLM Providers

Chunkr integrates with a variety of LLM providers to leverage AI capabilities during document processing.

# models.yaml configuration file
models:
  # OpenAI GPT models
  - id: gpt-4o
    model: gpt-4o
    provider_url: https://api.openai.com/v1/chat/completions
    api_key: "your_openai_api_key"
    default: true
    rate-limit: 200  # requests per minute
    
  # Google Gemini models
  - id: gemini-pro
    model: gemini-1.5-pro
    provider_url: https://generativelanguage.googleapis.com/v1beta/openai/chat/completions
    api_key: "your_google_api_key"
    rate-limit: 100
    
  # Anthropic Claude models
  - id: claude-3-sonnet
    model: claude-3-5-sonnet-20241022
    provider_url: https://api.anthropic.com/v1/messages
    api_key: "your_anthropic_api_key"
    rate-limit: 50
    
  # Local models (vLLM or Ollama)
  - id: local-llama
    model: llama-3.1-8b-instruct
    provider_url: http://localhost:8000/v1/chat/completions
    api_key: "not_required"
    rate-limit: 1000
    
  # OpenRouter (access to a variety of models)
  - id: openrouter-mixtral
    model: mistralai/mixtral-8x7b-instruct
    provider_url: https://openrouter.ai/api/v1/chat/completions
    api_key: "your_openrouter_api_key"
    rate-limit: 150

# Environment-specific settings
environments:
  development:
    default_model: local-llama
    fallback_model: gpt-4o
  
  production:
    default_model: gpt-4o
    fallback_model: gemini-pro

2. LLM-Enhanced Document Analysis

# LLM-integrated document analysis class
class LLMEnhancedDocumentAnalyzer:
    def __init__(self, chunkr_instance, llm_config):
        self.chunkr = chunkr_instance
        self.llm_config = llm_config
    
    def analyze_document_content(self, task):
        """Document content analysis using LLM"""
        
        # Extract basic structured data
        structured_data = task.json()
        
        # LLM analysis results
        analysis = {
            "summary": self._generate_summary(structured_data),
            "key_topics": self._extract_topics(structured_data),
            "entities": self._extract_entities(structured_data),
            "sentiment": self._analyze_sentiment(structured_data),
            "document_type": self._classify_document(structured_data),
            "quality_score": self._assess_quality(structured_data)
        }
        
        return analysis
    
    def _generate_summary(self, data):
        """Generate document summary"""
        
        full_text = self._extract_text_content(data)
        
        prompt = f"""
        Summarize the key content of the following document in 3-4 sentences:
        
        {full_text[:2000]}...
        
        Summary:
        """
        
        return self._call_llm(prompt, model="gpt-4o")
    
    def _extract_topics(self, data):
        """Extract key topics"""
        
        full_text = self._extract_text_content(data)
        
        prompt = f"""
        Extract the main topics from the following document and present them as a list:
        
        {full_text[:1500]}...
        
        Main topics (up to 5):
        """
        
        response = self._call_llm(prompt, model="gemini-pro")
        return self._parse_topics(response)
    
    def _extract_entities(self, data):
        """Named entity recognition"""
        
        full_text = self._extract_text_content(data)
        
        prompt = f"""
        Extract important named entities from the following text:
        - Person (PERSON)
        - Organization (ORGANIZATION)
        - Location (LOCATION)
        - Date (DATE)
        - Technology/Product (TECHNOLOGY)
        
        Text:
        {full_text[:1000]}...
        
        Respond in JSON format:
        """
        
        response = self._call_llm(prompt, model="claude-3-sonnet")
        return self._parse_entities(response)
    
    def _assess_quality(self, data):
        """Assess document quality"""
        
        metrics = {
            "text_clarity": 0.0,
            "structure_quality": 0.0,
            "content_depth": 0.0,
            "readability": 0.0,
            "completeness": 0.0
        }
        
        # Quality assessment based on OCR accuracy
        ocr_confidence = self._calculate_ocr_confidence(data)
        
        # Structural element assessment
        structure_score = self._evaluate_structure(data)
        
        # LLM-based content quality assessment
        content_score = self._evaluate_content_quality(data)
        
        overall_score = (ocr_confidence + structure_score + content_score) / 3
        
        return {
            "overall_score": overall_score,
            "metrics": metrics,
            "recommendations": self._get_quality_recommendations(overall_score)
        }
    
    def _call_llm(self, prompt, model="gpt-4o"):
        """Call the LLM API"""
        import requests
        
        model_config = next(
            (m for m in self.llm_config["models"] if m["id"] == model), 
            None
        )
        
        if not model_config:
            raise ValueError(f"Model {model} not found in configuration")
        
        headers = {
            "Authorization": f"Bearer {model_config['api_key']}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model_config["model"],
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "max_tokens": 500,
            "temperature": 0.1
        }
        
        response = requests.post(
            model_config["provider_url"],
            headers=headers,
            json=payload
        )
        
        if response.status_code == 200:
            return response.json()["choices"][0]["message"]["content"]
        else:
            raise Exception(f"LLM API call failed: {response.text}")

# Usage example
llm_config = {
    "models": [
        {
            "id": "gpt-4o",
            "model": "gpt-4o",
            "provider_url": "https://api.openai.com/v1/chat/completions",
            "api_key": "your_openai_api_key"
        }
    ]
}

analyzer = LLMEnhancedDocumentAnalyzer(chunkr, llm_config)

# Run document analysis
task = chunkr.upload("research_paper.pdf")
analysis = analyzer.analyze_document_content(task)

print(f"Document summary: {analysis['summary']}")
print(f"Key topics: {analysis['key_topics']}")
print(f"Quality score: {analysis['quality_score']['overall_score']:.2f}")

Real-World Use Cases and Implementations

1. Academic Paper Processing Pipeline

# Dedicated processing pipeline for academic papers
class AcademicPaperProcessor:
    def __init__(self, chunkr_instance):
        self.chunkr = chunkr_instance
    
    def process_research_paper(self, paper_path):
        """Dedicated processing for academic papers"""
        
        # Process document
        task = self.chunkr.upload(paper_path)
        structured_data = task.json()
        
        # Analyze paper structure
        paper_structure = self._analyze_paper_structure(structured_data)
        
        # Process each section
        processed_sections = {}
        
        for section_name, section_data in paper_structure.items():
            if section_name == "abstract":
                processed_sections[section_name] = self._process_abstract(section_data)
            elif section_name == "introduction":
                processed_sections[section_name] = self._process_introduction(section_data)
            elif section_name == "methodology":
                processed_sections[section_name] = self._process_methodology(section_data)
            elif section_name == "results":
                processed_sections[section_name] = self._process_results(section_data)
            elif section_name == "discussion":
                processed_sections[section_name] = self._process_discussion(section_data)
            elif section_name == "references":
                processed_sections[section_name] = self._process_references(section_data)
        
        # Extract metadata
        metadata = self._extract_paper_metadata(structured_data)
        
        return {
            "metadata": metadata,
            "sections": processed_sections,
            "figures": self._extract_figures(structured_data),
            "tables": self._extract_tables(structured_data),
            "equations": self._extract_equations(structured_data),
            "citations": self._extract_citations(structured_data)
        }
    
    def _analyze_paper_structure(self, data):
        """Automatic paper structure recognition"""
        
        sections = {}
        current_section = None
        
        section_keywords = {
            "abstract": ["abstract", "요약"],
            "introduction": ["introduction", "서론", "1. introduction"],
            "methodology": ["methodology", "method", "방법론", "실험방법"],
            "results": ["results", "result", "결과", "실험결과"],
            "discussion": ["discussion", "토론", "논의"],
            "conclusion": ["conclusion", "결론"],
            "references": ["references", "참고문헌", "bibliography"]
        }
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") == "header":
                    header_text = element.get("text", "").lower()
                    
                    for section_type, keywords in section_keywords.items():
                        if any(keyword in header_text for keyword in keywords):
                            current_section = section_type
                            if current_section not in sections:
                                sections[current_section] = []
                            break
                
                elif current_section and element.get("type") in ["paragraph", "table", "image"]:
                    sections[current_section].append(element)
        
        return sections
    
    def _extract_paper_metadata(self, data):
        """Extract paper metadata"""
        
        # Extract metadata from the first page
        first_page = data.get("pages", [{}])[0]
        elements = first_page.get("elements", [])
        
        metadata = {
            "title": "",
            "authors": [],
            "affiliations": [],
            "keywords": [],
            "doi": "",
            "publication_date": "",
            "journal": "",
            "abstract": ""
        }
        
        # Extract title (typically the first large-font text)
        for element in elements[:5]:  # look in the first 5 elements
            if element.get("type") == "header":
                style = element.get("style", {})
                if style.get("font_size", 0) > 16:
                    metadata["title"] = element.get("text", "")
                    break
        
        # Extract author information from elements after the title
        # Identified by email patterns or university name patterns
        
        return metadata

# Usage example
processor = AcademicPaperProcessor(chunkr)
paper_data = processor.process_research_paper("research_paper.pdf")

print(f"Paper title: {paper_data['metadata']['title']}")
print(f"Number of sections: {len(paper_data['sections'])}")
print(f"Number of figures: {len(paper_data['figures'])}")
print(f"Number of tables: {len(paper_data['tables'])}")

2. Corporate Document Auto-Classification System

# Corporate document auto-classification and processing
class CorporateDocumentClassifier:
    def __init__(self, chunkr_instance):
        self.chunkr = chunkr_instance
        self.document_types = {
            "contract": ["계약서", "계약", "agreement", "contract"],
            "report": ["보고서", "report", "analysis", "분석"],
            "manual": ["매뉴얼", "manual", "guide", "가이드"],
            "presentation": ["발표", "presentation", "slide"],
            "financial": ["재무", "financial", "budget", "예산"],
            "hr": ["인사", "hr", "human resource", "채용"],
            "legal": ["법무", "legal", "compliance", "규정"]
        }
    
    def classify_and_process(self, document_path):
        """Classify document and process by type"""
        
        # Basic document processing
        task = self.chunkr.upload(document_path)
        structured_data = task.json()
        
        # Classify document type
        doc_type = self._classify_document_type(structured_data)
        
        # Type-specific processing
        if doc_type == "contract":
            return self._process_contract(structured_data)
        elif doc_type == "report":
            return self._process_report(structured_data)
        elif doc_type == "financial":
            return self._process_financial_document(structured_data)
        else:
            return self._process_generic_document(structured_data)
    
    def _classify_document_type(self, data):
        """Automatic document type classification"""
        
        # Extract full text
        full_text = ""
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") in ["paragraph", "header"]:
                    full_text += element.get("text", "") + " "
        
        full_text = full_text.lower()
        
        # Classify via keyword matching
        type_scores = {}
        
        for doc_type, keywords in self.document_types.items():
            score = sum(1 for keyword in keywords if keyword in full_text)
            type_scores[doc_type] = score
        
        # Return the type with the highest score
        return max(type_scores, key=type_scores.get) if type_scores else "generic"
    
    def _process_contract(self, data):
        """Contract-specific processing"""
        
        contract_data = {
            "parties": self._extract_contract_parties(data),
            "terms": self._extract_contract_terms(data),
            "dates": self._extract_important_dates(data),
            "amounts": self._extract_monetary_amounts(data),
            "signatures": self._detect_signature_areas(data),
            "clauses": self._extract_clauses(data)
        }
        
        return contract_data
    
    def _process_financial_document(self, data):
        """Financial document-specific processing"""
        
        financial_data = {
            "financial_tables": self._extract_financial_tables(data),
            "key_metrics": self._extract_financial_metrics(data),
            "charts": self._extract_financial_charts(data),
            "currency_amounts": self._extract_currency_amounts(data),
            "accounting_periods": self._extract_accounting_periods(data)
        }
        
        return financial_data
    
    def _extract_financial_tables(self, data):
        """Extract and analyze financial tables"""
        
        financial_tables = []
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") == "table":
                    table_data = element.get("data", [])
                    
                    # Check if this is a financial table
                    if self._is_financial_table(table_data):
                        processed_table = {
                            "raw_data": table_data,
                            "headers": table_data[0] if table_data else [],
                            "rows": table_data[1:] if len(table_data) > 1 else [],
                            "page": page.get("page_number"),
                            "bbox": element.get("bbox"),
                            "financial_metrics": self._parse_financial_metrics(table_data)
                        }
                        financial_tables.append(processed_table)
        
        return financial_tables
    
    def _is_financial_table(self, table_data):
        """Determine whether a table is financial"""
        
        if not table_data:
            return False
        
        # Look for finance-related keywords in headers
        headers = table_data[0] if table_data else []
        financial_keywords = [
            "revenue", "매출", "profit", "이익", "cost", "비용",
            "asset", "자산", "liability", "부채", "equity", "자본",
            "cash", "현금", "investment", "투자", "expense", "지출"
        ]
        
        header_text = " ".join(str(header).lower() for header in headers)
        
        return any(keyword in header_text for keyword in financial_keywords)

# Usage example
classifier = CorporateDocumentClassifier(chunkr)

# Process a variety of documents
documents = [
    "sales_contract.pdf",
    "quarterly_report.pdf", 
    "user_manual.pdf",
    "budget_proposal.xlsx"
]

for doc in documents:
    result = classifier.classify_and_process(doc)
    print(f"{doc}: {type(result).__name__}")

3. Multilingual Document Processing

# Multilingual document processing system
class MultilingualDocumentProcessor:
    def __init__(self, chunkr_instance):
        self.chunkr = chunkr_instance
        self.supported_languages = {
            "ko": "Korean",
            "en": "English", 
            "ja": "Japanese",
            "zh": "Chinese",
            "de": "German",
            "fr": "French",
            "es": "Spanish"
        }
    
    def process_multilingual_document(self, document_path):
        """Process a multilingual document"""
        
        # Basic document processing
        task = self.chunkr.upload(document_path)
        structured_data = task.json()
        
        # Language detection and analysis
        language_analysis = self._analyze_languages(structured_data)
        
        # Separate text by language
        language_segments = self._segment_by_language(structured_data, language_analysis)
        
        # Translation processing (if needed)
        translated_content = self._translate_content(language_segments)
        
        return {
            "original_data": structured_data,
            "language_analysis": language_analysis,
            "language_segments": language_segments,
            "translations": translated_content,
            "unified_content": self._create_unified_content(language_segments, translated_content)
        }
    
    def _analyze_languages(self, data):
        """Analyze languages present in the document"""
        
        from langdetect import detect, detect_langs
        
        language_stats = {}
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") in ["paragraph", "header"]:
                    text = element.get("text", "")
                    
                    if len(text.strip()) > 20:  # only analyze text of sufficient length
                        try:
                            detected_langs = detect_langs(text)
                            for lang_info in detected_langs:
                                lang_code = lang_info.lang
                                confidence = lang_info.prob
                                
                                if lang_code not in language_stats:
                                    language_stats[lang_code] = {
                                        "count": 0,
                                        "total_confidence": 0,
                                        "text_samples": []
                                    }
                                
                                language_stats[lang_code]["count"] += 1
                                language_stats[lang_code]["total_confidence"] += confidence
                                language_stats[lang_code]["text_samples"].append(text[:100])
                        
                        except:
                            continue  # skip on detection failure
        
        # Calculate average confidence
        for lang in language_stats:
            count = language_stats[lang]["count"]
            if count > 0:
                language_stats[lang]["avg_confidence"] = (
                    language_stats[lang]["total_confidence"] / count
                )
        
        return language_stats
    
    def _segment_by_language(self, data, language_analysis):
        """Separate text segments by language"""
        
        from langdetect import detect
        
        segments = {}
        
        for page in data.get("pages", []):
            for element in page.get("elements", []):
                if element.get("type") in ["paragraph", "header"]:
                    text = element.get("text", "")
                    
                    if len(text.strip()) > 10:
                        try:
                            detected_lang = detect(text)
                            
                            if detected_lang not in segments:
                                segments[detected_lang] = []
                            
                            segments[detected_lang].append({
                                "text": text,
                                "page": page.get("page_number"),
                                "bbox": element.get("bbox"),
                                "element_type": element.get("type")
                            })
                        
                        except:
                            # On detection failure, classify under the default language
                            default_lang = max(language_analysis.keys(), 
                                             key=lambda x: language_analysis[x]["count"])
                            
                            if default_lang not in segments:
                                segments[default_lang] = []
                            
                            segments[default_lang].append({
                                "text": text,
                                "page": page.get("page_number"),
                                "bbox": element.get("bbox"),
                                "element_type": element.get("type")
                            })
        
        return segments
    
    def _translate_content(self, language_segments, target_lang="en"):
        """Translate multilingual content"""
        
        # This simplified example uses the Google Translate API.
        # In practice, you can use DeepL, Azure Translator, or a local translation model.
        
        translated_content = {}
        
        for source_lang, segments in language_segments.items():
            if source_lang != target_lang:
                translated_segments = []
                
                for segment in segments:
                    # Call the translation API (example)
                    translated_text = self._call_translation_api(
                        segment["text"], 
                        source_lang, 
                        target_lang
                    )
                    
                    translated_segment = segment.copy()
                    translated_segment["translated_text"] = translated_text
                    translated_segment["source_language"] = source_lang
                    translated_segment["target_language"] = target_lang
                    
                    translated_segments.append(translated_segment)
                
                translated_content[f"{source_lang}_to_{target_lang}"] = translated_segments
        
        return translated_content
    
    def _call_translation_api(self, text, source_lang, target_lang):
        """Call the translation API (example)"""
        
        # In a real implementation, use the API of your chosen translation service.
        # This is a simple placeholder.
        
        return f"[TRANSLATED from {source_lang} to {target_lang}] {text}"

# Usage example
multilingual_processor = MultilingualDocumentProcessor(chunkr)

# Process a multilingual document
result = multilingual_processor.process_multilingual_document("multilingual_report.pdf")

print("Detected languages:")
for lang, stats in result["language_analysis"].items():
    print(f"  {lang}: {stats['count']} segments (avg confidence: {stats['avg_confidence']:.2f})")

print(f"\nTranslated content: {len(result['translations'])} language pairs")

Performance Optimization and Scaling

1. Batch Processing for Large Document Volumes

# Batch processing system for large document volumes
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import time

class BatchDocumentProcessor:
    def __init__(self, chunkr_instance, max_workers=5):
        self.chunkr = chunkr_instance
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def process_documents_batch(self, document_paths):
        """Asynchronous batch document processing"""
        
        # Create task queue
        tasks = []
        
        for doc_path in document_paths:
            task = asyncio.create_task(
                self._process_single_document_async(doc_path)
            )
            tasks.append(task)
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Organize results
        processed_results = []
        failed_documents = []
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                failed_documents.append({
                    "document": document_paths[i],
                    "error": str(result)
                })
            else:
                processed_results.append({
                    "document": document_paths[i],
                    "result": result
                })
        
        return {
            "successful": processed_results,
            "failed": failed_documents,
            "total_processed": len(processed_results),
            "total_failed": len(failed_documents)
        }
    
    async def _process_single_document_async(self, doc_path):
        """Single document async processing"""
        
        loop = asyncio.get_event_loop()
        
        # Run CPU-intensive work in a separate thread
        result = await loop.run_in_executor(
            self.executor,
            self._process_document_sync,
            doc_path
        )
        
        return result
    
    def _process_document_sync(self, doc_path):
        """Synchronous document processing"""
        
        try:
            # Process document
            task = self.chunkr.upload(doc_path)
            
            # Generate results in multiple formats
            result = {
                "html": task.html(),
                "markdown": task.markdown(),
                "json": task.json(),
                "processing_time": time.time(),
                "file_size": self._get_file_size(doc_path)
            }
            
            return result
            
        except Exception as e:
            raise Exception(f"Failed to process {doc_path}: {str(e)}")
    
    def _get_file_size(self, file_path):
        """Calculate file size"""
        import os
        return os.path.getsize(file_path) if os.path.exists(file_path) else 0
    
    def process_with_priority_queue(self, documents_with_priority):
        """Priority-based document processing"""
        
        import heapq
        
        # Create priority queue (lower number = higher priority)
        priority_queue = []
        
        for priority, doc_path in documents_with_priority:
            heapq.heappush(priority_queue, (priority, doc_path))
        
        results = []
        
        while priority_queue:
            priority, doc_path = heapq.heappop(priority_queue)
            
            try:
                print(f"Processing priority {priority}: {doc_path}")
                result = self._process_document_sync(doc_path)
                results.append({
                    "priority": priority,
                    "document": doc_path,
                    "result": result,
                    "status": "success"
                })
                
            except Exception as e:
                results.append({
                    "priority": priority,
                    "document": doc_path,
                    "error": str(e),
                    "status": "failed"
                })
        
        return results

# Usage example
batch_processor = BatchDocumentProcessor(chunkr, max_workers=3)

# Asynchronous batch processing
document_list = [
    "document1.pdf",
    "document2.pptx", 
    "document3.docx",
    "document4.pdf"
]

# Asynchronous execution
async def run_batch_processing():
    results = await batch_processor.process_documents_batch(document_list)
    
    print(f"Successful: {results['total_processed']}")
    print(f"Failed: {results['total_failed']}")
    
    for failed in results['failed']:
        print(f"Failed document: {failed['document']} - {failed['error']}")

# Priority-based processing
priority_documents = [
    (1, "urgent_contract.pdf"),    # high priority
    (3, "quarterly_report.pdf"),   # medium priority
    (2, "legal_document.pdf"),     # medium-high priority
    (5, "training_manual.pdf")     # low priority
]

priority_results = batch_processor.process_with_priority_queue(priority_documents)

2. Memory and Performance Optimization

# Performance optimization manager
class PerformanceOptimizer:
    def __init__(self, chunkr_instance):
        self.chunkr = chunkr_instance
        self.performance_metrics = {}
    
    def optimize_for_large_documents(self, doc_path, max_memory_mb=2048):
        """Optimized processing for large documents"""
        
        import psutil
        import gc
        
        # Check initial memory state
        initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
        
        # Check document size
        file_size = self._get_file_size_mb(doc_path)
        
        # Determine processing strategy based on memory
        if file_size > 100:  # over 100 MB
            strategy = "streaming"
        elif file_size > 50:  # over 50 MB
            strategy = "chunked"
        else:
            strategy = "standard"
        
        print(f"File size: {file_size:.1f}MB, strategy: {strategy}")
        
        start_time = time.time()
        
        try:
            if strategy == "streaming":
                result = self._streaming_process(doc_path, max_memory_mb)
            elif strategy == "chunked":
                result = self._chunked_process(doc_path, max_memory_mb)
            else:
                result = self._standard_process(doc_path)
            
            processing_time = time.time() - start_time
            peak_memory = psutil.Process().memory_info().rss / 1024 / 1024
            
            # Record performance metrics
            self.performance_metrics[doc_path] = {
                "file_size_mb": file_size,
                "processing_time": processing_time,
                "initial_memory_mb": initial_memory,
                "peak_memory_mb": peak_memory,
                "memory_delta_mb": peak_memory - initial_memory,
                "strategy": strategy,
                "throughput_mb_per_sec": file_size / processing_time if processing_time > 0 else 0
            }
            
            # Clean up memory
            gc.collect()
            
            return result
            
        except Exception as e:
            print(f"Optimization processing failed: {e}")
            raise
    
    def _streaming_process(self, doc_path, max_memory_mb):
        """Streaming processing (for large files)"""
        
        # Sequential page-by-page processing
        task = self.chunkr.upload(doc_path)
        
        # Process JSON results as a stream
        json_result = task.json()
        
        # Process page by page
        processed_pages = []
        
        for page in json_result.get("pages", []):
            # Process one page at a time to limit memory usage
            processed_page = self._process_single_page(page)
            processed_pages.append(processed_page)
            
            # Check memory usage
            current_memory = psutil.Process().memory_info().rss / 1024 / 1024
            if current_memory > max_memory_mb:
                # Save intermediate results and free memory
                self._save_intermediate_results(processed_pages, doc_path)
                processed_pages = []  # reset list
                gc.collect()
        
        return {
            "pages": processed_pages,
            "processing_method": "streaming"
        }
    
    def _chunked_process(self, doc_path, max_memory_mb):
        """Chunked processing (for medium-sized files)"""
        
        # Process document in multiple chunks
        task = self.chunkr.upload(doc_path)
        full_result = task.json()
        
        # Group pages into chunks
        pages = full_result.get("pages", [])
        chunk_size = max(1, len(pages) // 4)  # divide into 4 chunks
        
        processed_chunks = []
        
        for i in range(0, len(pages), chunk_size):
            chunk_pages = pages[i:i + chunk_size]
            
            # Process chunk
            chunk_result = self._process_page_chunk(chunk_pages)
            processed_chunks.append(chunk_result)
            
            # Intermediate cleanup
            if i % (chunk_size * 2) == 0:  # clean every 2 chunks
                gc.collect()
        
        return {
            "chunks": processed_chunks,
            "processing_method": "chunked"
        }
    
    def _standard_process(self, doc_path):
        """Standard processing (for regular-sized files)"""
        
        task = self.chunkr.upload(doc_path)
        
        return {
            "html": task.html(),
            "markdown": task.markdown(),
            "json": task.json(),
            "processing_method": "standard"
        }
    
    def get_performance_report(self):
        """Generate performance report"""
        
        if not self.performance_metrics:
            return "No performance data available"
        
        total_files = len(self.performance_metrics)
        total_size = sum(m["file_size_mb"] for m in self.performance_metrics.values())
        total_time = sum(m["processing_time"] for m in self.performance_metrics.values())
        avg_throughput = sum(m["throughput_mb_per_sec"] for m in self.performance_metrics.values()) / total_files
        
        report = f"""
        Performance Report
        ====================
        Files processed: {total_files}
        Total file size: {total_size:.1f}MB
        Total processing time: {total_time:.1f}s
        Average throughput: {avg_throughput:.2f}MB/s
        Overall throughput: {total_size/total_time:.2f}MB/s
        
        Per-file details:
        """
        
        for doc_path, metrics in self.performance_metrics.items():
            report += f"""
        {doc_path}:
          - Size: {metrics['file_size_mb']:.1f}MB
          - Time: {metrics['processing_time']:.1f}s
          - Throughput: {metrics['throughput_mb_per_sec']:.2f}MB/s
          - Memory increase: {metrics['memory_delta_mb']:.1f}MB
          - Strategy: {metrics['strategy']}
        """
        
        return report

# Usage example
optimizer = PerformanceOptimizer(chunkr)

# Optimized processing for large documents
large_documents = [
    "large_report_150mb.pdf",
    "huge_manual_300mb.pdf",
    "massive_dataset_500mb.xlsx"
]

for doc in large_documents:
    try:
        result = optimizer.optimize_for_large_documents(doc, max_memory_mb=4096)
        print(f"{doc} processed successfully")
    except Exception as e:
        print(f"{doc} processing failed: {e}")

# Output performance report
print(optimizer.get_performance_report())

License and Deployment Options

Open Source vs. Commercial Service Comparison

Chunkr offers flexible license options for a variety of use cases:

Feature Open Source (AGPL-3.0) Commercial API Enterprise
Target Development and testing Production applications Large-scale/high-security deployments
Layout analysis Basic models Advanced models Advanced + custom tuning
OCR accuracy Standard models Premium models Premium + domain tuning
VLM processing Basic vision models Enhanced VLM models Enhanced + custom fine-tuning
Excel support Not included Native parser Native parser
Infrastructure Self-hosted Fully managed Fully managed (on-prem/cloud)
Support Discord community Priority email + community 24/7 dedicated team

Real-World Deployment Scenarios

# Deployment environment configuration management
class DeploymentManager:
    def __init__(self):
        self.deployment_configs = {
            "development": {
                "mode": "self_hosted",
                "docker_compose": "compose.yaml",
                "gpu_support": False,
                "scaling": "single_instance",
                "monitoring": "basic"
            },
            "staging": {
                "mode": "hybrid",
                "docker_compose": "compose.yaml + compose.cpu.yaml",
                "gpu_support": True,
                "scaling": "horizontal",
                "monitoring": "detailed"
            },
            "production": {
                "mode": "commercial_api",
                "endpoint": "https://api.chunkr.ai",
                "gpu_support": True,
                "scaling": "auto",
                "monitoring": "enterprise"
            },
            "enterprise": {
                "mode": "on_premise",
                "docker_compose": "enterprise.yaml",
                "gpu_support": True,
                "scaling": "kubernetes",
                "monitoring": "full_observability"
            }
        }
    
    def setup_environment(self, env_type):
        """Apply environment-specific configuration"""
        
        config = self.deployment_configs.get(env_type)
        if not config:
            raise ValueError(f"Unsupported environment: {env_type}")
        
        if config["mode"] == "self_hosted":
            return self._setup_self_hosted(config)
        elif config["mode"] == "commercial_api":
            return self._setup_commercial_api(config)
        elif config["mode"] == "enterprise":
            return self._setup_enterprise(config)
    
    def _setup_self_hosted(self, config):
        """Self-hosted environment setup"""
        
        return f"""
        # Self-hosted setup
        git clone https://github.com/lumina-ai-inc/chunkr
        cd chunkr
        
        # Configure environment
        cp .env.example .env
        cp models.example.yaml models.yaml
        
        # Run Docker Compose
        docker compose -f {config['docker_compose']} up -d
        
        # Access
        Web UI: http://localhost:5173
        API: http://localhost:8000
        """
    
    def _setup_commercial_api(self, config):
        """Commercial API setup"""
        
        return f"""
        # Commercial API setup
        pip install chunkr-ai
        
        # Usage code
        from chunkr_ai import Chunkr
        
        chunkr = Chunkr(api_key="your_api_key_from_chunkr_ai")
        
        # API endpoint: {config['endpoint']}
        """
    
    def _setup_enterprise(self, config):
        """Enterprise environment setup"""
        
        return f"""
        # Enterprise environment setup
        # 1. Configure dedicated infrastructure
        # 2. Apply security settings
        # 3. Configure {config['scaling']} scaling
        # 4. Install {config['monitoring']} monitoring
        
        # Kubernetes deployment (example)
        kubectl apply -f k8s/chunkr-enterprise.yaml
        """

# Performance comparison and selection guide
deployment_guide = """
# Chunkr Deployment Guide

## 1. Development and Testing Phase
- **Recommended**: Open source self-hosting
- **Reason**: Free, customizable, for learning purposes
- **Limitations**: Basic models, no Excel support

## 2. Small-Scale Production
- **Recommended**: Commercial API
- **Reason**: No management overhead, advanced features, stability
- **Cost**: Usage-based billing

## 3. Large-Scale Production
- **Recommended**: Enterprise
- **Reason**: Dedicated support, customization, enhanced security
- **Features**: On-premises or dedicated cloud

## 4. Hybrid Approach
- **Development**: Open source local environment
- **Testing**: Validate with commercial API
- **Production**: Enterprise deployment
"""

print(deployment_guide)

Automated Testing and Validation

Integration Test Script

#!/bin/bash
# test-chunkr.sh

set -e

# Color definitions
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

log_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

PROJECT_DIR="$HOME/ai-projects/chunkr"

echo "🚀 Chunkr environment test starting"
echo "=============================="

# 1. Check system requirements
log_info "Checking system information..."
echo "📱 OS: $(uname -s) $(uname -r)"
echo "🐍 Python: $(python3 --version 2>/dev/null || echo 'Python3 not found')"
echo "🐳 Docker: $(docker --version 2>/dev/null || echo 'Docker not found')"
echo "📦 Docker Compose: $(docker compose version 2>/dev/null || echo 'Docker Compose not found')"

# 2. Clone and set up project
log_info "Setting up project..."
mkdir -p "$PROJECT_DIR"
cd "$PROJECT_DIR"

if [ ! -d ".git" ]; then
    log_info "Cloning Chunkr project..."
    git clone https://github.com/lumina-ai-inc/chunkr.git .
fi

# 3. Configure environment files
if [ ! -f ".env" ]; then
    log_info "Configuring environment file..."
    cp .env.example .env
fi

if [ ! -f "models.yaml" ]; then
    log_info "Creating model configuration file..."
    cp models.example.yaml models.yaml
fi

# 4. Check Docker environment
if command -v docker &> /dev/null && command -v docker compose &> /dev/null; then
    log_info "Testing Docker environment..."
    
    # Check GPU support
    if command -v nvidia-smi &> /dev/null; then
        log_info "NVIDIA GPU detected, running in GPU mode"
        COMPOSE_FILES="compose.yaml"
    elif [[ $(uname -s) == "Darwin" ]] && [[ $(uname -m) == "arm64" ]]; then
        log_info "Apple Silicon detected, running in MAC ARM mode"
        COMPOSE_FILES="compose.yaml -f compose.cpu.yaml -f compose.mac.yaml"
    else
        log_info "Running in CPU mode"
        COMPOSE_FILES="compose.yaml -f compose.cpu.yaml"
    fi
    
    # Run Docker Compose
    log_info "Starting Chunkr services..."
    docker compose -f $COMPOSE_FILES up -d
    
    # Wait for services to be ready
    log_info "Waiting for service initialization..."
    sleep 30
    
    # Health check
    log_info "Checking service status..."
    for i in {1..10}; do
        if curl -s http://localhost:8000/health > /dev/null; then
            log_success "API server confirmed operational"
            break
        elif [ $i -eq 10 ]; then
            log_error "API server not responding"
            docker compose -f $COMPOSE_FILES logs api
        else
            echo "Waiting... ($i/10)"
            sleep 5
        fi
    done
    
    if curl -s http://localhost:5173 > /dev/null; then
        log_success "Web UI confirmed operational"
    else
        log_warning "Web UI not responding"
    fi
    
else
    log_warning "Docker not installed, running Python SDK tests only"
fi

# 5. Python SDK test
log_info "Testing Python SDK..."

# Create virtual environment
if [ ! -d "chunkr-env" ]; then
    python3 -m venv chunkr-env
fi

source chunkr-env/bin/activate

# Install SDK
pip install --upgrade pip
pip install chunkr-ai requests

# Create SDK test script
cat > test_chunkr_sdk.py << 'EOF'
#!/usr/bin/env python3
"""
Chunkr SDK feature tests
"""

import sys
import os
import tempfile
import requests

def test_imports():
    """Package import test"""
    print("📦 SDK import test...")
    
    try:
        from chunkr_ai import Chunkr
        print("  ✅ chunkr-ai package")
        return True
    except ImportError as e:
        print(f"  ❌ chunkr-ai import failed: {e}")
        return False

def test_local_api():
    """Local API connection test"""
    print("\n🔌 Local API connection test...")
    
    try:
        response = requests.get("http://localhost:8000/health", timeout=5)
        if response.status_code == 200:
            print("  ✅ Local API server responding")
            return True
        else:
            print(f"  ❌ API server error: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"  ❌ API connection failed: {e}")
        return False

def test_sample_document():
    """Sample document processing test"""
    print("\n📄 Sample document processing test...")
    
    # Create a simple text file
    sample_content = """
    # Test Document
    
    ## Overview
    This is a sample document for Chunkr testing.
    
    ## Content
    - Item 1: Layout analysis test
    - Item 2: OCR feature test
    - Item 3: Semantic chunking test
    
    ## Conclusion
    All features should work correctly.
    """
    
    try:
        # Create a temporary file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write(sample_content)
            temp_file = f.name
        
        print(f"  📝 Temporary file created: {temp_file}")
        
        # Use local API (test possible even without Cloud API key)
        if test_local_api():
            print("  💡 Local API available, running actual test")
            # In a real implementation, use the local API endpoint
            print("  ✅ Sample document processing test passed (simulated)")
        else:
            print("  💡 No local API, Cloud API key required")
            print("  ℹ️ Test possible after obtaining API key from chunkr.ai")
        
        # Clean up temporary file
        os.unlink(temp_file)
        
        return True
        
    except Exception as e:
        print(f"  ❌ Sample document processing failed: {e}")
        return False

def main():
    print("🧪 Chunkr SDK Test\n")
    
    tests = [
        ("Package Import", test_imports),
        ("Local API Connection", test_local_api),
        ("Sample Document Processing", test_sample_document)
    ]
    
    passed = 0
    for name, test_func in tests:
        try:
            if test_func():
                passed += 1
                print(f"✅ {name}: PASS")
            else:
                print(f"❌ {name}: FAIL")
        except Exception as e:
            print(f"❌ {name}: ERROR - {e}")
    
    print(f"\n📊 Test results: {passed}/{len(tests)} passed")
    
    if passed == len(tests):
        print("\n🎉 All tests passed!")
        print("💡 You can now start processing documents with Chunkr.")
    else:
        print(f"\n⚠️ {len(tests)-passed} test(s) failed")
        print("💡 Check the failures and reconfigure your environment.")
    
    return passed == len(tests)

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)
EOF

# Run Python tests
log_info "Running SDK feature tests..."
if python test_chunkr_sdk.py; then
    log_success "SDK tests passed!"
else
    log_warning "Some SDK tests failed"
fi

# 6. Usage guide
echo ""
echo "🎯 Chunkr Usage Guide:"
echo "====================="
echo "1. Project directory:"
echo "   cd $PROJECT_DIR"
echo ""
echo "2. Access Web UI:"
echo "   http://localhost:5173"
echo ""
echo "3. API documentation:"
echo "   http://localhost:8000/docs"
echo ""
echo "4. Python SDK usage:"
echo "   source chunkr-env/bin/activate"
echo "   python"
echo '   >>> from chunkr_ai import Chunkr'
echo '   >>> chunkr = Chunkr(api_key="your_key_or_use_local")'
echo ""
echo "5. Stop services:"
echo "   docker compose -f $COMPOSE_FILES down"

echo ""
echo "💡 Key Features:"
echo "============="
echo "• Advanced layout analysis for accurate document structure detection"
echo "• Precise OCR + bounding boxes for text position tracking"
echo "• Semantic chunking for RAG system optimization"
echo "• Supports a wide range of formats: PDF, PPT, Word, Excel"
echo "• Flexible options: open source or commercial service"

log_success "Chunkr environment setup complete!"
echo "📁 Project location: $PROJECT_DIR"
echo "🚀 Start processing documents with document intelligence now!"

Conclusion

Chunkr is a platform that changes the paradigm for document-based AI applications. Key outcomes and values:

Core Innovations

  1. Intelligent document understanding: goes beyond plain text extraction to capture structure, meaning, and context
  2. RAG system optimization: semantic chunking for improved retrieval accuracy and context preservation
  3. Broad format support: consistent quality processing across PDF, PPT, Word, and Excel
  4. Scalable architecture: addresses every scale from development to enterprise

Technical Differentiation

  • Vision-based layout analysis: accurately identifies the structural relationships among tables, images, and charts
  • Precise OCR + bounding boxes: preserves positional and typographic information of text
  • Adaptive chunking strategy: optimized splitting based on content type
  • LLM integration: seamless integration with a variety of AI models

Practical Value

  1. Research and academia: systematic analysis and knowledge extraction from papers and reports
  2. Corporate document management: automatic classification and key information extraction from contracts and financial statements
  3. Content production: convert complex documents to RAG-ready data
  4. Multilingual processing: language-specific analysis and translation support for global documents

Open Workflow Ecosystem

  • Flexible deployment options: from self-hosting to enterprise
  • Open source community: transparent development under AGPL-3.0 license
  • Commercial service: managed solutions for production environments
  • Scalability: cloud-native architecture based on Docker and Kubernetes

Looking Ahead

Chunkr enables anyone to build high-quality document intelligence applications. For developers and companies working with non-English documents, Chunkr offers language-specific processing support, accessible APIs for complex document AI workflows, cost-efficient open-source foundations, and customizable domain-specific models.

Transform your document repository into an intelligent knowledge base with Chunkr and maximize the performance of your RAG systems.


References