Building Async Pipelines for Batch Floor Plan Processing

Indoor mapping and wayfinding automation pipelines routinely choke on synchronous file I/O when ingesting multi-megabyte DWG exports, high-resolution SVG facility schematics, or legacy DXF archives. For facilities engineers, GIS developers, and indoor navigation teams, the bottleneck is rarely the geometry extraction algorithms themselves; it is the event loop starvation caused by blocking disk reads, unbounded memory allocation, and uncoordinated CPU-bound parser instantiation. Transitioning to a strictly asynchronous ingestion architecture eliminates these failure modes while enabling predictable throughput during peak facility upload windows.

Architecting the Async Ingestion Layer

Batch floor plan ingestion requires deterministic backpressure management. When hundreds of architectural files land in a network-attached storage bucket or cloud object store simultaneously, naive glob() + open() patterns will exhaust file descriptors and saturate RAM before the first vectorization worker completes. The solution is a bounded asyncio.Queue paired with explicit semaphore control, ensuring downstream consumers receive a steady, memory-safe stream of payloads.

Queue capacity should scale with available compute cores rather than arbitrary constants. Setting maxsize to os.cpu_count() * 2 aligns producer throughput with consumer capacity, preventing memory exhaustion while maintaining pipeline fluidity. Concurrent file reads must be throttled using asyncio.Semaphore, particularly when pulling from high-latency network storage. If tasks begin queuing indefinitely, verify that your consumer coroutine wraps await queue.get() in a try/finally block to guarantee queue.task_done() executes even when parsing exceptions occur.

import asyncio
import os
import aiofiles
import logging
from itertools import chain
from pathlib import Path
from typing import AsyncGenerator

logger = logging.getLogger("floorplan.ingest")

class FloorPlanIngestor:
    def __init__(self, max_concurrent: int = 8, queue_size: int | None = None):
        self.queue = asyncio.Queue(
            maxsize=queue_size or (os.cpu_count() or 4) * 2
        )
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self._shutdown_event = asyncio.Event()

    async def enqueue_plans(self, source_dir: Path) -> None:
        """Stream DXF/SVG paths into the bounded queue with backpressure."""
        candidates = chain(source_dir.rglob("*.dxf"), source_dir.rglob("*.svg"))
        for fp in candidates:
            await self.queue.put(fp)
            logger.debug("Enqueued: %s | Queue size: %d", fp.name, self.queue.qsize())
        await self.queue.join()  # Block until all items are processed

    async def worker(self, worker_id: int) -> AsyncGenerator[bytes, None]:
        """Consume paths, throttle I/O, and yield raw bytes to downstream parsers."""
        while not self._shutdown_event.is_set():
            try:
                file_path = await asyncio.wait_for(self.queue.get(), timeout=30.0)
            except asyncio.TimeoutError:
                break
                
            try:
                async with self.semaphore:
                    async with aiofiles.open(file_path, mode="rb") as f:
                        raw_bytes = await f.read()
                    yield raw_bytes, file_path
            except Exception as e:
                logger.error(
                    "[Worker-%d] Ingest failed: %s | %s", 
                    worker_id, file_path.name, e, exc_info=True
                )
            finally:
                self.queue.task_done()

Offloading CPU-Bound CAD/SVG Parsing

The transition from raw binary to parseable geometry must remain strictly non-blocking. Libraries like ezdxf or svglib perform heavy tree traversal, coordinate normalization, and entity resolution. Invoking them directly inside an async def coroutine will freeze the event loop, causing health-check timeouts and cascading pipeline stalls.

Production pipelines must route parser instantiation through asyncio.to_thread() for lightweight I/O-bound workloads, or concurrent.futures.ProcessPoolExecutor for CPU-intensive geometry extraction. For facilities teams managing legacy AutoCAD exports, validate DXF version headers before dispatching to prevent silent parse failures. The Automated Floor Plan Parsing & Vectorization architecture relies on strict separation between I/O multiplexing and CPU-heavy geometry extraction to maintain sub-200ms task scheduling latency.

import asyncio
import concurrent.futures
from typing import Dict, Any

class GeometryExtractor:
    def __init__(self, max_workers: int = 4):
        self.executor = concurrent.futures.ProcessPoolExecutor(
            max_workers=max_workers
        )

    async def parse_and_extract(self, raw_bytes: bytes, file_path: Path) -> Dict[str, Any]:
        """Dispatch CPU-bound parsing to a separate process pool."""
        try:
            loop = asyncio.get_running_loop()
            # Offload blocking ezdxf/svglib parsing to process pool
            geometry_data = await loop.run_in_executor(
                self.executor,
                self._blocking_parse,
                raw_bytes,
                file_path.suffix
            )
            return geometry_data
        except Exception as e:
            logger.critical("Parser dispatch failed for %s: %s", file_path.name, e)
            raise

    @staticmethod
    def _blocking_parse(raw_bytes: bytes, extension: str) -> Dict[str, Any]:
        """CPU-bound parsing logic. Must not touch async state."""
        if extension.lower() == ".dxf":
            # ezdxf.read() requires a text stream and is strictly synchronous
            import io
            import ezdxf
            doc = ezdxf.read(io.StringIO(raw_bytes.decode("utf-8", errors="replace")))
            return {
                "version": doc.dxfversion,
                "entities": sum(1 for _ in doc.modelspace()),
                "layers": [layer.dxf.name for layer in doc.layers],
                "units": doc.header.get("$INSUNITS", 0)
            }
        elif extension.lower() == ".svg":
            # SVG parsing via lxml so we can inspect declared namespaces
            from lxml import etree
            tree = etree.fromstring(raw_bytes)
            return {
                "viewbox": tree.get("viewBox"),
                "elements": len(tree),
                "namespaces": [ns for ns in tree.nsmap.keys() if ns]
            }
        raise ValueError(f"Unsupported extension: {extension}")

Topology Validation & Spatial Attribute Mapping

Once geometry is extracted, the pipeline must normalize coordinate systems, detect structural primitives, and map facility attributes. Wall and door detection algorithms typically rely on line intersection analysis, layer filtering, and proximity clustering. These operations should run immediately after parsing, within the same worker context, to avoid serializing large geometry objects across IPC boundaries.

Coordinate alignment is critical for indoor navigation accuracy. DXF files frequently use arbitrary local coordinate systems (LCS) that must be transformed into a unified facility grid or georeferenced CRS. Apply affine transformations during the extraction phase, then validate topology using adjacency graphs. Advanced topology validation checks for orphaned nodes, overlapping polygons, and disconnected corridor segments before committing to the spatial database. This step directly feeds into the Async Batch Processing Pipelines specification, ensuring that wayfinding graphs remain consistent across facility updates.

def validate_topology(geometry: Dict[str, Any]) -> Dict[str, Any]:
    """Run deterministic spatial validation before committing to wayfinding graph."""
    issues = []
    
    # Example: Check for disconnected layers or missing door markers
    if "DOOR" not in geometry.get("layers", []):
        issues.append("MISSING_DOOR_LAYER")
    if geometry.get("entities", 0) < 50:
        issues.append("LOW_ENTITY_COUNT_POSSIBLY_BLANK")
        
    # Normalize units (DXF INSUNITS: 1=Unitless, 2=Inches, 4=Feet, 5=Meters)
    unit_map = {1: 1.0, 2: 0.0254, 4: 0.3048, 5: 1.0}
    scale_factor = unit_map.get(geometry.get("units", 1), 1.0)
    
    return {
        "geometry": geometry,
        "scale_factor": scale_factor,
        "validation_issues": issues,
        "is_valid": len(issues) == 0
    }

Production Diagnostics & Graceful Shutdown

Facilities tech teams require audit-grade logging and deterministic teardown. Async pipelines must expose runtime metrics, track queue depth, and handle SIGINT/SIGTERM without corrupting in-flight geometry payloads. Use asyncio.gather() with return_exceptions=True to isolate worker failures, and explicitly close the ProcessPoolExecutor during shutdown to prevent RuntimeError: cannot schedule new futures after shutdown.

Monitor the event loop using loop.slow_callback_duration and asyncio.all_tasks(). If task counts grow linearly while queue depth remains static, a consumer is deadlocked or a parser is silently swallowing exceptions. Implement structured JSON logging for facilities compliance, capturing file hashes, parse durations, and validation flags.

async def run_pipeline(source_dir: Path, workers: int = 6) -> None:
    ingestor = FloorPlanIngestor(max_concurrent=workers)
    extractor = GeometryExtractor(max_workers=workers)
    
    # Start workers
    worker_tasks = [
        asyncio.create_task(_consume_worker(ingestor, extractor, i))
        for i in range(workers)
    ]
    
    # Start producer
    producer_task = asyncio.create_task(ingestor.enqueue_plans(source_dir))
    
    try:
        await asyncio.gather(producer_task, *worker_tasks)
    except asyncio.CancelledError:
        logger.info("Pipeline cancellation requested. Draining queue...")
        ingestor._shutdown_event.set()
        await asyncio.gather(*worker_tasks, return_exceptions=True)
    finally:
        extractor.executor.shutdown(wait=True)
        logger.info("Pipeline teardown complete.")

async def _consume_worker(ingestor: FloorPlanIngestor, extractor: GeometryExtractor, wid: int) -> None:
    async for raw_bytes, file_path in ingestor.worker(wid):
        geo = await extractor.parse_and_extract(raw_bytes, file_path)
        validated = validate_topology(geo)
        logger.info(
            "Processed: %s | Valid: %s | Entities: %d",
            file_path.name, validated["is_valid"], geo["entities"]
        )
        # Dispatch to spatial DB or wayfinding graph builder here

Integration with Real-Time Wayfinding Updates

The async ingestion pipeline serves as the foundation for continuous facility mapping. Once validated geometry is committed, the system must trigger incremental topology updates rather than full graph rebuilds. By emitting structured events containing modified wall segments, door states, and room boundaries, indoor navigation engines can recalculate shortest-path matrices in near real-time.

Attribute mapping from blueprints—such as ADA compliance tags, fire exit routing, and HVAC zone boundaries—should be resolved during the parsing phase and attached to the topology graph as edge/node properties. This ensures that wayfinding queries respect operational constraints without requiring post-processing joins. For teams scaling across multi-building campuses, the Automated Floor Plan Parsing & Vectorization framework provides the necessary data contracts to synchronize async ingestion with distributed spatial indexing services.

External References & Standards Compliance