Source code for environmentaltools.drone.drone_missions

#!/usr/bin/env python3
"""
Batch create KMZ files from waypoints CSV using a template KMZ folder.

Requirements implemented:
 1) Given a DJI root folder, list all subfolders except 'capability' and 'map_preview'.
 2) Split `_{polygon_no}.csv` into chunks of `chunk_size` (default 50) and generate one KMZ per chunk until all points are used.
    KMZ files are named using the folder names found in (1), cycling if needed.
 3) Inside the DJI `map_preview` folder, create a subfolder per KMZ (name without .kmz) and place a JPG with the same name.
 4) The JPG contains the text showing the file position relative to total (e.g. '3/7').

The script supports `--dry-run` to only print the planned operations.
"""

import math
import shutil
import tempfile
from pathlib import Path
import copy
import xml.etree.ElementTree as ET
import csv
import os
import zipfile
from pathlib import Path


# Third-party imports
import pandas as pd
from loguru import logger
from PIL import Image, ImageDraw, ImageFont

import matplotlib
matplotlib.use("Agg")  # Use non-interactive backend
import matplotlib.patheffects as pe
import matplotlib.pyplot as plt



# XML namespace definitions for DJI WPML format
WPML_NS = "http://www.uav.com/wpmz/1.0.2"
KML_NS = "http://www.opengis.net/kml/2.2"
NS = {"wpml": WPML_NS, "kml": KML_NS}


def qname(tag, ns=WPML_NS):
    """Build qualified XML tag names with namespace.

    Parameters
    ----------
    tag : str
        The XML tag name without namespace.
    ns : str, optional
        The XML namespace URI. Default is WPML_NS.

    Returns
    -------
    str
        Qualified tag name in format "{namespace}tag".

    Examples
    --------
    >>> qname("index")
    "{http://www.uav.com/wpmz/1.0.2}index"
    >>> qname("Document", KML_NS)
    "{http://www.opengis.net/kml/2.2}Document"
    """
    return f"{{{ns}}}{tag}"


[docs] def generate_wpml_from_csv(template_path: Path, csv_path: Path, out_path: Path, limit: int = None, take: str = 'first', start_zero: bool = False, start_index: int | None = None): """Generate a WPML file from a template and CSV waypoints data. Parameters ---------- template_path : Path Path to the WPML template file (waylines.wpml). csv_path : Path Path to the CSV file containing waypoint data with longitude and latitude columns. out_path : Path Output path for the generated WPML file. limit : int, optional Maximum number of waypoints to process. If None, processes all waypoints. take : str, optional Strategy for selecting waypoints when limit is applied. Either "first" or "last". Default is "first". start_zero : bool, optional If True, start waypoint indexing from 0. If False, start from 1. Default is False. start_index : int, optional Force a specific starting index for waypoint numbering. Overrides start_zero if provided. Returns ------- None Writes the generated WPML file to the specified output path. Raises ------ RuntimeError If no <Folder> element is found in the template. If no <Placemark> elements are found in the template folder. FileNotFoundError If the template file or CSV file doesn't exist. Notes ----- The function expects CSV data with columns that can be detected as longitude/latitude: - Longitude: 'lon', 'lng', 'longitude', 'x' - Latitude: 'lat', 'latitude', 'y' The template WPML file should contain at least one <Placemark> element that will be used as a template for generating new placemarks for each waypoint. Action group indices within placemarks are automatically incremented to ensure unique identification across all generated waypoints. """ # Parse the WPML template file tree = ET.parse(str(template_path)) root = tree.getroot() # Register XML namespaces for proper output formatting ET.register_namespace('', KML_NS) ET.register_namespace('wpml', WPML_NS) # Locate the Folder element within the Document structure folder = root.find('.//{http://www.opengis.net/kml/2.2}Folder') if folder is None: raise RuntimeError('No <Folder> element found in template') # Calculate starting index for new waypoints # Find all existing wpml:index values to determine next available index existing_indexes = [int(idx.text) for idx in root.findall('.//{http://www.uav.com/wpmz/1.0.2}index') if idx.text and idx.text.strip().isdigit()] max_index = max(existing_indexes) if existing_indexes else -1 # Determine starting index based on parameters: # 1. If start_index explicitly provided, use it # 2. If start_zero is requested, begin at 0 # 3. Otherwise, continue after maximum existing index if start_index is not None: next_index = int(start_index) else: next_index = 0 if start_zero else (max_index + 1) # Load waypoint data from CSV file (expects latitude,longitude columns) df = pd.read_csv(str(csv_path)) if 'latitude' not in df.columns or 'longitude' not in df.columns: raise RuntimeError('CSV must contain latitude and longitude columns') # Apply optional limit for batch processing (take 'first' or 'last' rows) original_len = len(df) if limit is not None and limit >= 0: if take == 'last': df = df.tail(limit) else: # take == 'first' df = df.head(limit) processed_len = len(df) # Use the first Placemark as template for generating new waypoints placemarks = folder.findall('{http://www.opengis.net/kml/2.2}Placemark') if not placemarks: raise RuntimeError('No <Placemark> found in template') first_pm = placemarks[0] # Remove all existing Placemark elements from the Folder to avoid # keeping template coordinates in the final file. We'll clone # the first placemark for each CSV waypoint instead. for pm in placemarks: folder.remove(pm) # Generate new Placemark for each waypoint row in the CSV data for i, row in df.iterrows(): lat = float(row['latitude']) lon = float(row['longitude']) idx = next_index + i # Create a deep copy of the template placemark pm = copy.deepcopy(first_pm) # Update Point coordinates with waypoint data point = pm.find('{http://www.opengis.net/kml/2.2}Point') if point is None: # Skip waypoint if template structure is unexpected continue coords = point.find('{http://www.opengis.net/kml/2.2}coordinates') if coords is None: coords = ET.SubElement(point, '{http://www.opengis.net/kml/2.2}coordinates') # Format coordinates to match template indentation coords.text = f"\n {lon},{lat}\n " # Update waypoint index in wpml:index element idx_elem = pm.find(qname('index')) if idx_elem is None: idx_elem = ET.SubElement(pm, qname('index')) idx_elem.text = str(idx) # Update action group indices to ensure unique identification # Find and update actionGroupStartIndex/EndIndex within actionGroup elements for ag in pm.findall(qname('actionGroup')): start = ag.find(qname('actionGroupStartIndex')) end = ag.find(qname('actionGroupEndIndex')) if start is not None: start.text = str(idx) if end is not None: # Keep action groups to single waypoint for safety end.text = str(idx) # Update any additional actionGroup indices deeper in the XML tree (safety measure) for el in pm.findall('.//'): # Check element tag (may include namespace prefix) if el.tag == qname('actionGroupStartIndex') or el.tag.endswith('actionGroupStartIndex'): el.text = str(idx) if el.tag == qname('actionGroupEndIndex') or el.tag.endswith('actionGroupEndIndex'): el.text = str(idx) # Add the configured placemark to the folder folder.append(pm) # Write the generated WPML file to disk tree.write(str(out_path), encoding='utf-8', xml_declaration=True)
# logger.info(f'Generated: {out_path} (added {processed_len} placemarks, indices from {next_index} to {next_index + processed_len - 1})')
[docs] def build_kmz_from_template( kmz_folder: Path, csv_path: Path, limit: int | None, take: str, start_zero: bool, out_dir: Path | None, start_index: int | None = None, polygon_no: str | None = None, # polygon identifier for output naming chunk_no: str | None = None, # chunk/section identifier for output naming ): """Build a KMZ file from a template folder and waypoints CSV data. Parameters ---------- kmz_folder : Path Template folder containing the wpmz subfolder with template.kml and WPML template. csv_path : Path Path to the CSV file containing waypoint data. limit : int, optional Maximum number of waypoints to process. If None, processes all waypoints. take : str Strategy for selecting waypoints when limit is applied ("first" or "last"). start_zero : bool If True, start waypoint indexing from 0 in the generated WPML. out_dir : Path, optional Output directory for the generated KMZ file. If None, uses template folder's parent. start_index : int, optional Force a specific starting index for waypoint numbering. Overrides start_zero if provided. polygon_no : str, optional Polygon identifier for output file naming structure. chunk_no : str, optional Chunk/section identifier for output file naming structure. Returns ------- Path Path to the generated KMZ file. Raises ------ SystemExit If template folder, wpmz subfolder, template.kml, or WPML template is not found. RuntimeError If the generated archive is not found after creation. Notes ----- The function creates a temporary directory, copies the template.kml file, generates a new waypoints.wpml from the CSV data using the WPML template, and packages everything into a KMZ (ZIP) archive. The output naming convention is: - With polygon_no: "polygon_no/chunk_no.kmz" - Without polygon_no: "chunk_no.kmz" """ # Validate input parameters and template structure if not kmz_folder.exists() or not kmz_folder.is_dir(): raise SystemExit(f"KMZ folder not found: {kmz_folder}") wpmz_dir = kmz_folder / "wpmz" if not wpmz_dir.exists() or not wpmz_dir.is_dir(): raise SystemExit(f"No wpmz folder inside: {wpmz_dir}") # Locate required template files template_kml = wpmz_dir / "template.kml" if not template_kml.exists(): raise SystemExit(f"template.kml not found in {wpmz_dir}") # Find WPML template file (use first *.wpml found in wpmz directory) wpml_candidates = list(wpmz_dir.glob("*.wpml")) if not wpml_candidates: raise SystemExit(f"No .wpml template found in {wpmz_dir}") wpml_template = wpml_candidates[0] # Create temporary workspace for KMZ generation tmp = Path(tempfile.mkdtemp(prefix="kmz_build_")) try: tmp_wpmz = tmp / "wpmz" tmp_wpmz.mkdir(parents=True, exist_ok=True) # Copy template.kml to temporary workspace shutil.copy2(template_kml, tmp_wpmz / "template.kml") # Generate waypoints.wpml from CSV data using the WPML template out_wpml = tmp_wpmz / "waypoints.wpml" # Call the integrated WPML generator with all parameters generate_wpml_from_csv( wpml_template, csv_path, out_wpml, limit=limit, take=take, start_zero=start_zero, start_index=start_index, ) # Determine output filename based on polygon and chunk identifiers if polygon_no: out_name = f"{polygon_no}/{chunk_no}" else: out_name = f"{chunk_no}" # Set output directory and create target path if out_dir: out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) target_zip = out_dir / f"{out_name}.kmz" else: target_zip = kmz_folder.parent / f"{out_name}.kmz" # Create KMZ archive from temporary directory contents # Use temporary location to avoid naming collisions during multi-threaded operations archive_base_tmp = str((tmp / out_name)) shutil.make_archive(archive_base_tmp, "zip", root_dir=tmp) zip_tmp = Path(archive_base_tmp + ".zip") # Ensure target directory structure exists target_zip.parent.mkdir(parents=True, exist_ok=True) # Remove any existing KMZ file at target location to allow overwrite if target_zip.exists(): target_zip.unlink() # Move generated archive to final destination and rename .zip to .kmz if zip_tmp.exists(): shutil.move(str(zip_tmp), str(target_zip)) else: raise RuntimeError(f"Expected archive not found: {zip_tmp}") # logger.info(f"KMZ generated: {target_zip}") return target_zip finally: # Clean up temporary directory shutil.rmtree(tmp)
[docs] def list_dji_dirs(root: Path): """List DJI directories excluding 'capability' and 'map_preview' folders. Parameters ---------- root : Path Root directory to scan for DJI folders. Returns ------- list of str Sorted list of directory names suitable for KMZ naming. """ return [ p.name for p in sorted(root.iterdir()) if p.is_dir() and p.name not in ("capability", "map_preview") ]
[docs] def create_preview( jpg_path: Path, text: str, size=(1920, 1080), waypoints_csv: Path | None = None, dpi: int = 96, ): """Create a preview image with text overlay and optional waypoints map. Parameters ---------- jpg_path : Path Output path for the generated JPEG preview image. text : str Text to display on the image (e.g., mission number). size : tuple, optional Image size as (width, height) in pixels. Default is (1920, 1080). waypoints_csv : Path, optional Path to CSV file with waypoints data for map overlay. Default is None. dpi : int, optional Resolution in dots per inch for the output image. Default is 96. Returns ------- None Saves the preview image to the specified path. Raises ------ RuntimeError If Pillow is not installed and image creation is not possible. """ if Image is None: raise RuntimeError( "Pillow is required to create preview images. Install with: pip install Pillow" ) # Create a white background image img = Image.new("RGB", size, "white") draw = ImageDraw.Draw(img) # Define candidate font paths for different operating systems # Prefer bold fonts for better visibility font_path = None font_candidates = [ r"C:\Windows\Fonts\arialbd.ttf", # Windows Arial Bold r"C:\Windows\Fonts\ARIALBD.TTF", # Windows Arial Bold (uppercase) r"C:\Windows\Fonts\arial.ttf", # Windows Arial "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", # Linux bold "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", # Linux regular ] # Test each font candidate to find a working one for fp in font_candidates: if Path(fp).exists(): font_path = fp break # If no predefined font found, scan Windows font directory if font_path is None: win_fonts = Path(r"C:\Windows\Fonts") if win_fonts.exists(): # Priority order: bold fonts -> arial fonts -> any TTF candidates = ( list(win_fonts.glob("*bold*.ttf")) + list(win_fonts.glob("*bd*.ttf")) + list(win_fonts.glob("*arial*.ttf")) + list(win_fonts.glob("*.ttf")) ) for c in candidates: if c.exists(): font_path = str(c) break # Auto-scale font size to fit within canvas dimensions font = None if font_path: max_w = size[0] * 0.99 # Use 99% of width for margin max_h = size[1] * 0.99 # Use 99% of height for margin # Start with large font size (almost full height) and shrink until it fits fs = max(12, int(size[1] * 0.995)) while fs >= 8: f = ImageFont.truetype(font_path, fs) # Calculate text dimensions with current font size if hasattr(draw, "textbbox"): bbox = draw.textbbox((0, 0), text, font=f) w = bbox[2] - bbox[0] h = bbox[3] - bbox[1] else: # Fallback for older PIL versions w, h = draw.textsize(text, font=f) # Accept font if it fits within both width and height constraints if h <= max_h and w <= max_w: font = f break # Reduce font size gradually for better fitting fs = int(fs * 0.92) # Fallback to default font if no suitable font found if font is None: font = ImageFont.load_default() # Calculate final text dimensions for positioning if hasattr(draw, "textbbox"): bbox = draw.textbbox((0, 0), text, font=font) w = bbox[2] - bbox[0] h = bbox[3] - bbox[1] else: w, h = draw.textsize(text, font=font) # Center the text on the canvas x = (size[0] - w) / 2 y = (size[1] - h) / 2 # Calculate stroke width for text outline (improves readability) stroke_w = max(2, int(h * 0.08)) # Draw text with white outline and black fill for maximum contrast if hasattr(draw.text, '__code__') and 'stroke_width' in draw.text.__code__.co_varnames: # Modern PIL with stroke support draw.text( (x, y), text, fill="black", font=font, stroke_width=stroke_w, stroke_fill="white", ) else: # Fallback for older PIL versions without stroke support # Create outline effect by drawing white text at multiple offsets offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)] for ox, oy in offsets: draw.text((x + ox, y + oy), text, fill="white", font=font) draw.text((x, y), text, fill="black", font=font) # Enhanced mode: Create waypoints map overlay if CSV provided if waypoints_csv is not None: # Load waypoints data from CSV dfw = pd.read_csv(str(waypoints_csv)) # Auto-detect longitude and latitude column names lon_cols = [ c for c in dfw.columns if c.lower() in ("lon", "lng", "longitude", "x") ] lat_cols = [c for c in dfw.columns if c.lower() in ("lat", "latitude", "y")] if lon_cols and lat_cols: lon = dfw[lon_cols[0]].astype(float) lat = dfw[lat_cols[0]].astype(float) # Configure matplotlib figure with exact dimensions and DPI fig_dpi = dpi fig_w = size[0] / fig_dpi fig_h = size[1] / fig_dpi fig, ax = plt.subplots(figsize=(fig_w, fig_h), dpi=fig_dpi) # Set light gray background for map area ax.set_facecolor("#f0f0f0") # Plot waypoints as red scatter points ax.scatter(lon, lat, s=10, c="red", alpha=0.8) # Calculate map bounds with safety margins minx, maxx = lon.min(), lon.max() miny, maxy = lat.min(), lat.max() # Handle edge case where all coordinates are identical if minx == maxx: minx -= 0.0005 maxx += 0.0005 if miny == maxy: miny -= 0.0005 maxy += 0.0005 # Add 6% margin around the data bounds dx = (maxx - minx) * 0.06 dy = (maxy - miny) * 0.06 ax.set_xlim(minx - dx, maxx + dx) ax.set_ylim(miny - dy, maxy + dy) # Remove axis ticks for cleaner appearance ax.set_xticks([]) ax.set_yticks([]) # Add large mission number text in the center with white outline fs = int(size[1] * 0.45) # Font size relative to image height txt = text txt_artist = ax.text( 0.5 * (minx + maxx), # Center horizontally 0.5 * (miny + maxy), # Center vertically txt, color="black", fontsize=fs, ha="center", va="center", weight="bold", ) # Add white stroke effect for better visibility txt_artist.set_path_effects( [ pe.Stroke( linewidth=max(2, int(fs * 0.06)), foreground="white" ), pe.Normal(), ] ) # Remove margins and save the matplotlib figure plt.subplots_adjust(left=0, right=1, top=1, bottom=0) fig.savefig(str(jpg_path), dpi=fig_dpi) plt.close(fig) return # PIL-only rendering: Save the text-only image jpg_path.parent.mkdir(parents=True, exist_ok=True) img.save(str(jpg_path), format="JPEG", dpi=(dpi, dpi))
[docs] def create( template_kmz_folder: Path, csv_path: Path, chunk_size: int = 50, take: str = "first", out_dir: Path | None = None, # missions_csv: Path | None = None, polygon_no: str = "001", # polygon identifier string for output directory structure ): """Generate multiple KMZ mission files by splitting waypoints CSV into chunks. This function processes a large waypoints CSV file by dividing it into smaller chunks and creating individual KMZ mission files for each chunk. It also generates preview images showing the mission area and waypoints. Parameters ---------- template_kmz_folder : Path Path to the template KMZ folder used as a base for new missions. csv_path : Path Path to the input CSV file containing waypoint data. chunk_size : int, optional Maximum number of waypoints per generated KMZ file. Default is 50. take : str, optional Strategy for selecting waypoints ("first", "last", etc.). Default is "first". out_dir : Path, optional Base output directory for generated files. If None, uses "missions" directory. polygon_no : str, optional Polygon identifier used in output directory structure. Default is "001". Returns ------- None Creates KMZ files and preview images in the output directory structure: out_dir/polygon_no/NNN/NNN.kmz and map_preview/NNN/NNN.jpg Raises ------ SystemExit If template folder doesn't exist, CSV file not found, or CSV is empty. FileNotFoundError If the KMZ builder utility script is not found. Notes ----- The function creates a directory structure like: - missions/polygon_no/001/001.kmz - missions/polygon_no/002/002.kmz - missions/polygon_no/map_preview/001/001.jpg - missions/polygon_no/map_preview/002/002.jpg Each KMZ file contains a subset of waypoints from the original CSV, with waypoint indexing restarting from 0 for each chunk. """ workspace = Path(__file__).resolve().parent # Configure base output directory structure: missions/polygon_no/ if out_dir: base_output = Path(out_dir) / polygon_no else: base_output = Path("missions") / polygon_no base_output.mkdir(parents=True, exist_ok=True) # Get list of available directory names for KMZ naming (legacy feature) # Currently uses workspace directories, but missions CSV option is preserved dirs = list_dji_dirs(workspace) # Validate template folder and input CSV file if not template_kmz_folder.exists() or not template_kmz_folder.is_dir(): raise SystemExit(f"Template KMZ folder not found: {template_kmz_folder}") if not csv_path.exists(): raise SystemExit(f"CSV not found: {csv_path}") # Load and validate waypoints data df = pd.read_csv(str(csv_path)) total = len(df) if total == 0: raise SystemExit("CSV contains no points") # Calculate number of output files needed based on chunk size files_needed = math.ceil(total / chunk_size) # Create map preview directory structure map_preview_dir = base_output / "map_preview" if not map_preview_dir.exists(): map_preview_dir.mkdir(parents=True, exist_ok=True) # Process each chunk of waypoints to create individual KMZ files for i in range(files_needed): # Define chunk boundaries start = i * chunk_size end = min(start + chunk_size, total) chunk = df.iloc[start:end] # Generate sequential naming: 001, 002, 003, etc. name = f"{i+1:03d}" kmz_name = f"{name}.kmz" preview_sub = map_preview_dir / name preview_jpg = preview_sub / f"{name}.jpg" logger.info(f"[{i+1}/{files_needed}] {kmz_name}: points {start}..{end-1}") # Create temporary CSV file for this chunk with polygon identifier tmpdir = Path(tempfile.mkdtemp(prefix="kmz_chunk_")) tmp_csv = tmpdir / f"waypoints_dji_{polygon_no}.csv" chunk.to_csv(tmp_csv, index=False) # Configure waypoint indexing to start from 0 for each KMZ file # This ensures consistent waypoint numbering across chunks start_index = 0 pass_start_zero = True # Generate KMZ file using the integrated builder function generated_kmz = build_kmz_from_template( template_kmz_folder, tmp_csv, limit=None, take=take, start_zero=pass_start_zero, out_dir=out_dir, start_index=start_index, polygon_no=polygon_no, # Pass polygon identifier to builder chunk_no=name, # Pass chunk number to builder ) # Move and rename generated KMZ to target location # Structure: base_output/NNN/NNN.kmz target_dir = base_output / name target = target_dir / kmz_name gen_path = Path(generated_kmz) # Ensure target directory exists target_dir.mkdir(parents=True, exist_ok=True) if not gen_path.exists(): logger.warning( f"Generated KMZ not found at {gen_path}; skipping move to {target}" ) else: # Handle file moving with safety checks if gen_path.resolve() == target.resolve(): logger.info(f"Source and target are the same ({target}); skipping move") else: if target.exists(): target.unlink() # Remove existing file shutil.move(str(gen_path), str(target)) # Create preview image with mission number and waypoint map overlay preview_sub.mkdir(parents=True, exist_ok=True) create_preview(preview_jpg, f"{i+1}/{files_needed}", waypoints_csv=tmp_csv) logger.info(f" -> created {target} and preview {preview_jpg}") # Clean up temporary directory shutil.rmtree(tmpdir)
[docs] def rename(path: str, polygon_no: str, missions_csv: Path): """Rename mission files and folders based on names from a CSV file. This function renames mission directories and their contents according to a CSV file that contains new names. It handles both regular files and KMZ archives, updating internal file names within KMZ files and maintaining the directory structure. Parameters ---------- path : str Base path where the missions are located. polygon_no : str Polygon number as string, must match the one used in mission creation. missions_csv : Path Path to CSV file containing new names for missions (one per line). Returns ------- None Renames files and directories in place. Raises ------ FileNotFoundError If the polygon directory doesn't exist. ValueError If directory names cannot be parsed as integers. Notes ----- The function expects the following directory structure: - path/missions/polygon_no/001/001.kmz - path/missions/polygon_no/002/002.kmz - path/missions/polygon_no/map_preview/001/001.jpg - path/missions/polygon_no/map_preview/002/002.jpg CSV file should contain one name per line, corresponding to directories 001, 002, etc. The function will rename both the directories and all files containing the old names. For KMZ files, internal file names are also updated to maintain consistency. """ # Load all mission names from CSV file with open(missions_csv, newline="", encoding="utf-8") as f: reader = csv.reader(f) names = [row[0].strip() for row in reader if row and row[0].strip()] # Locate the polygon directory containing mission segments polygon_dir = Path(path) / "missions" / polygon_no if not polygon_dir.exists(): raise FileNotFoundError(f"Polygon directory not found: {polygon_dir}") # Process all existing mission segment directories for segment_dir in sorted(polygon_dir.iterdir()): if not segment_dir.is_dir(): continue segment_no = segment_dir.name # Skip non-numeric directories (like 'map_preview') if not segment_no.isdigit(): logger.warning(f"Skipping non-numeric directory: {segment_no}") continue else: segment_idx = int(segment_no) - 1 # Convert to zero-based index if segment_idx < 0: logger.warning(f"Skipping invalid segment number: {segment_no}") continue if segment_idx >= len(names): logger.warning(f"No name in missions.csv for segment {segment_no}, skipping.") continue new_name = names[segment_idx] logger.info(f"Renaming segment {segment_no} -> {new_name}") # Rename all files in the segment directory that contain the segment number for file in segment_dir.iterdir(): if segment_no in file.name: new_file = file.with_name(file.name.replace(segment_no, new_name)) file.rename(new_file) # Special handling for KMZ files: update internal file names if new_file.suffix.lower() == ".kmz": _update_kmz_internal_names(new_file, segment_dir, segment_no, new_name) # Rename the segment directory itself new_segment_dir = segment_dir.parent / new_name segment_dir.rename(new_segment_dir) logger.info(f" -> {segment_no} renamed to {new_name}") # Update corresponding map_preview directory and files _update_preview_directory(new_segment_dir, segment_no, new_name)
def _update_kmz_internal_names(kmz_file: Path, segment_dir: Path, old_name: str, new_name: str): """Update internal file names within a KMZ archive. Parameters ---------- kmz_file : Path Path to the KMZ file to update. segment_dir : Path Directory containing the KMZ file. old_name : str Original segment name to replace. new_name : str New segment name to use. """ # Extract KMZ contents to temporary directory with zipfile.ZipFile(kmz_file, "r") as zin: tmp_dir = segment_dir / "_tmp_kmz" zin.extractall(tmp_dir) # Rename internal files that contain the old segment name for root, dirs, files in os.walk(tmp_dir): for fname in files: if old_name in fname: src = Path(root) / fname dst = Path(root) / fname.replace(old_name, new_name) src.rename(dst) # Repackage the KMZ file with updated internal names with zipfile.ZipFile(kmz_file, "w", zipfile.ZIP_DEFLATED) as zout: for root, dirs, files in os.walk(tmp_dir): for fname in files: fpath = Path(root) / fname arcname = fpath.relative_to(tmp_dir) zout.write(fpath, arcname) # Clean up temporary directory for root, dirs, files in os.walk(tmp_dir, topdown=False): for fname in files: (Path(root) / fname).unlink() for dname in dirs: (Path(root) / dname).rmdir() tmp_dir.rmdir() def _update_preview_directory(segment_dir: Path, old_name: str, new_name: str): """Update map preview directory and files for a renamed segment. Parameters ---------- segment_dir : Path The renamed segment directory. old_name : str Original segment name. new_name : str New segment name. """ map_preview_dir = segment_dir.parent / "map_preview" old_preview_sub = map_preview_dir / old_name new_preview_sub = map_preview_dir / new_name if old_preview_sub.exists(): # Rename files within the preview subdirectory for file in old_preview_sub.iterdir(): if old_name in file.name: new_file = file.with_name(file.name.replace(old_name, new_name)) file.rename(new_file) # Rename the preview subdirectory itself old_preview_sub.rename(new_preview_sub) logger.info(f" -> map_preview/{old_name} renamed to map_preview/{new_name}")