Async Batch Processing Pipelines for Indoor Mapping & Wayfinding

Indoor mapping operations routinely ingest hundreds of heterogeneous floor plans per deployment cycle. Synchronous processing models introduce unacceptable latency, block event loops during heavy geometry operations, and create cascading failures when upstream CAD/BIM sources throttle throughput. A production-grade async batch architecture decouples ingestion, parsing, vectorization, and topology construction into discrete, non-blocking stages. Within the broader Automated Floor Plan Parsing & Vectorization ecosystem, the pipeline must enforce strict memory boundaries, handle backpressure gracefully, and maintain deterministic output schemas for downstream GIS consumption.

Pipeline Architecture & Ingestion Strategy

The ingestion layer operates as a coroutine-driven file scanner that populates a bounded asyncio.Queue. By limiting queue depth to 2 * worker_count, the system prevents unbounded memory allocation when scanning network-mounted directories or cloud storage buckets. Each queued task carries a lightweight manifest: file path, source format, floor identifier, and a UUID for distributed tracing. This manifest-driven approach ensures that heavy I/O operations are isolated from CPU-bound vectorization routines, allowing the event loop to remain responsive while workers process geometry.

Format-specific ingestion requires tailored extraction logic. For instance, rasterized PDFs and vector-based DWG files demand different read strategies before they can enter the unified processing stream. Refer to SVG/DWG Parsing Workflows for format-specific normalization routines that should execute prior to queue insertion.

Asynchronous Worker Orchestration & Concurrency Control

Floor plan processing exhibits a hybrid workload profile: file I/O and database writes are inherently asynchronous, while geometric operations (polygonization, snapping, intersection testing) are CPU-bound. Python’s asyncio runtime cannot natively parallelize CPU work, requiring explicit delegation to process or thread pools. The architectural blueprint outlined in Building async pipelines for batch floor plan processing demonstrates how to bridge this gap using loop.run_in_executor paired with a ProcessPoolExecutor for heavy geometry kernels.

Concurrency control is enforced via asyncio.Semaphore. Without explicit throttling, workers will exhaust file descriptors, saturate disk I/O bandwidth, or trigger OS-level OOM kills when processing multi-story IFC exports. A semaphore initialized to the physical core count (or a tuned fraction for containerized deployments) guarantees that only a predictable number of vectorization tasks run concurrently. For detailed tuning strategies on worker pool sizing and memory footprint reduction, consult Optimizing Python async workers for parsing.

Production Implementation: Runnable Pipeline

The following implementation provides a complete, production-ready async batch processor. It integrates bounded queues, semaphore-controlled concurrency, process pool delegation for CPU-bound geometry, and graceful cancellation handling.

import asyncio
import aiofiles
import logging
import uuid
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import AsyncIterator, Dict, Any
from concurrent.futures import ProcessPoolExecutor
import time

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("indoor_pipeline")

# --- Data Structures ---
@dataclass
class TaskManifest:
    file_path: Path
    source_format: str
    floor_id: str
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    retry_count: int = 0

# --- CPU-Bound Geometry Kernel (Runs in separate process) ---
def process_geometry_kernel(file_path: str, source_format: str) -> Dict[str, Any]:
    """
    Simulates heavy CPU-bound geometry processing.
    Replace with actual Shapely/GeoPandas/IFCopenshell routines.
    """
    logger.info(f"[CPU Kernel] Processing {file_path} ({source_format})")
    time.sleep(0.5)  # Simulate polygonization, snapping, topology validation
    return {
        "status": "success",
        "geometry_count": 142,
        "topology_valid": True,
        "floor_id": Path(file_path).stem.split("_")[0]
    }

# --- Async Pipeline Components ---
class AsyncBatchPipeline:
    def __init__(self, max_workers: int = 4, queue_depth: int = 8):
        self.max_workers = max_workers
        self.queue_depth = queue_depth
        self.task_queue: asyncio.Queue[TaskManifest] = asyncio.Queue(maxsize=queue_depth)
        self.semaphore = asyncio.Semaphore(max_workers)
        self.executor = ProcessPoolExecutor(max_workers=max_workers)
        self.loop = asyncio.get_running_loop()
        self._shutdown_event = asyncio.Event()

    async def ingest_scanner(self, scan_dir: Path) -> None:
        """Coroutine-driven directory scanner with backpressure."""
        logger.info(f"Starting ingestion scan: {scan_dir}")
        for fp in scan_dir.rglob("*"):
            if fp.suffix.lower() in {".dwg", ".svg", ".pdf", ".ifc"}:
                manifest = TaskManifest(
                    file_path=fp,
                    source_format=fp.suffix.lstrip("."),
                    floor_id=fp.stem.split("_")[0]
                )
                # await blocks until queue has space (backpressure)
                await self.task_queue.put(manifest)
                logger.debug(f"Enqueued: {fp.name} | Queue size: {self.task_queue.qsize()}")
        await self.task_queue.put(None)  # Sentinel for graceful shutdown

    async def worker(self, worker_id: int) -> None:
        """Drains queue, respects semaphore, delegates CPU work."""
        while True:
            manifest = await self.task_queue.get()
            
            if manifest is None:
                self.task_queue.task_done()
                break

            async with self.semaphore:
                try:
                    logger.info(f"Worker-{worker_id} | Processing {manifest.file_path.name}")
                    
                    # Delegate CPU-bound work to process pool
                    result = await self.loop.run_in_executor(
                        self.executor,
                        process_geometry_kernel,
                        str(manifest.file_path),
                        manifest.source_format
                    )
                    
                    # Async export / DB write
                    await self._export_to_gis(result, manifest.trace_id)
                    logger.info(f"Worker-{worker_id} | Completed {manifest.file_path.name}")
                    
                except Exception as e:
                    logger.error(f"Worker-{worker_id} | Failed {manifest.file_path.name}: {e}")
                    if manifest.retry_count < 2:
                        manifest.retry_count += 1
                        await self.task_queue.put(manifest)
                finally:
                    self.task_queue.task_done()

    async def _export_to_gis(self, geo_result: Dict[str, Any], trace_id: str) -> None:
        """Simulate async GIS export / topology registration."""
        async with aiofiles.open(f"output_{trace_id}.json", "w") as f:
            await f.write(str(geo_result))

    async def run(self, scan_dir: Path) -> None:
        """Orchestrate pipeline lifecycle."""
        # Start workers
        workers = [asyncio.create_task(self.worker(i)) for i in range(self.max_workers)]
        
        # Start scanner
        scanner = asyncio.create_task(self.ingest_scanner(scan_dir))
        
        # Wait for scanner to finish
        await scanner
        
        # Wait for all queued tasks to complete
        await self.task_queue.join()
        
        # Cancel workers gracefully
        for w in workers:
            w.cancel()
        
        await asyncio.gather(*workers, return_exceptions=True)
        self.executor.shutdown(wait=True)
        logger.info("Pipeline completed successfully.")

# --- Entry Point ---
async def main():
    scan_directory = Path("./floor_plans_input")
    scan_directory.mkdir(exist_ok=True)
    
    # Create dummy files for demonstration
    for i in range(12):
        (scan_directory / f"level_{i}_A.dwg").touch()
        
    pipeline = AsyncBatchPipeline(max_workers=4, queue_depth=8)
    await pipeline.run(scan_directory)

if __name__ == "__main__":
    asyncio.run(main())

Downstream Topology & GIS Integration

Once vectorized, parsed floor plans feed directly into spatial analysis engines. The pipeline’s deterministic output schema ensures seamless handoff to Wall & Door Detection Algorithms, which rely on clean, non-overlapping polygon geometries and consistent attribute mapping.

For large-scale campus deployments or multi-story BIM environments, scaling strategies must account for IFC schema complexity and spatial reference transformations. See Scaling Python workers for IFC parsing for memory-mapped IFC extraction patterns that prevent worker thrashing.

The pipeline’s trace IDs enable real-time topology updates in wayfinding engines. By publishing completion events to a message broker (e.g., Redis Streams or RabbitMQ), indoor navigation teams can trigger incremental graph rebuilds without requiring full dataset reloads.

Operational Troubleshooting & Runbook

Symptom Root Cause Resolution
EventLoopBlockedWarning or high latency spikes CPU-bound geometry running on main thread Ensure all heavy operations use loop.run_in_executor with ProcessPoolExecutor. Never call Shapely/IFCopenshell synchronously inside async functions.
OSError: [Errno 24] Too many open files Unbounded concurrent file handles Lower max_workers, increase OS ulimit -n, and ensure aiofiles contexts are properly closed. Use asyncio.Semaphore to cap concurrent I/O.
Queue starvation / workers idle Scanner coroutine yielding too slowly or blocking on network mounts Implement async directory traversal (aiopath or aiofiles.os.scandir), add timeout guards, and verify network storage latency.
MemoryError / OOM kills during IFC processing Process pool workers loading entire BIM models into RAM Switch to chunked IFC parsing, enable multiprocessing.shared_memory for large arrays, and enforce strict queue_depth limits.
CancelledError propagating unexpectedly Improper task cancellation during graceful shutdown Use asyncio.gather(*tasks, return_exceptions=True) and wrap critical export steps in asyncio.shield() to prevent partial writes.

Proactive Monitoring:

  • Track queue.qsize() vs queue.maxsize to detect backpressure bottlenecks.
  • Log executor task duration percentiles; if P95 exceeds 5s, increase ProcessPoolExecutor workers or optimize geometry kernels.
  • Use tracemalloc in staging to identify memory leaks in long-running worker processes.

For authoritative reference on async queue semantics and executor lifecycle management, consult the official asyncio Queue documentation and concurrent.futures documentation.