import os import re import gc import traceback import gradio as gr import numpy as np import spaces import torch import random import time import tempfile from PIL import Image, ImageDraw from typing import Iterable, Optional from transformers import ( AutoImageProcessor, AutoModelForDepthEstimation, ) from huggingface_hub import hf_hub_download from safetensors.torch import load_file as safetensors_load_file from diffusers import WanImageToVideoPipeline from diffusers.utils import export_to_video from gradio.themes import Soft from gradio.themes.utils import colors, fonts, sizes # ============================================================ # Theme # ============================================================ colors.orange_red = colors.Color( name="orange_red", c50="#FFF0E5", c100="#FFE0CC", c200="#FFC299", c300="#FFA366", c400="#FF8533", c500="#FF4500", c600="#E63E00", c700="#CC3700", c800="#B33000", c900="#992900", c950="#802200", ) class OrangeRedTheme(Soft): def __init__( self, *, primary_hue: colors.Color | str = colors.gray, secondary_hue: colors.Color | str = colors.orange_red, neutral_hue: colors.Color | str = colors.slate, text_size: sizes.Size | str = sizes.text_lg, font: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("Outfit"), "Arial", "sans-serif", ), font_mono: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace", ), ): super().__init__( primary_hue=primary_hue, secondary_hue=secondary_hue, neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono, ) super().set( background_fill_primary="*primary_50", background_fill_primary_dark="*primary_900", body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", button_primary_text_color="white", button_primary_text_color_hover="white", button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)", button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)", button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)", button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)", button_secondary_text_color="black", button_secondary_text_color_hover="white", button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)", button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)", button_secondary_background_fill_dark="linear-gradient(90deg, *primary_500, *primary_600)", button_secondary_background_fill_hover_dark="linear-gradient(90deg, *primary_500, *primary_500)", slider_color="*secondary_500", slider_color_dark="*secondary_600", block_title_text_weight="600", block_border_width="3px", block_shadow="*shadow_drop_lg", button_primary_shadow="*shadow_drop_lg", button_large_padding="11px", color_accent_soft="*primary_100", block_label_background_fill="*primary_200", ) orange_red_theme = OrangeRedTheme() # ============================================================ # Device # ============================================================ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dtype = torch.bfloat16 print("CUDA_VISIBLE_DEVICES=", os.environ.get("CUDA_VISIBLE_DEVICES")) print("torch.__version__ =", torch.__version__) print("torch.version.cuda =", torch.version.cuda) print("cuda available:", torch.cuda.is_available()) print("cuda device count:", torch.cuda.device_count()) if torch.cuda.is_available(): print("current device:", torch.cuda.current_device()) print("device name:", torch.cuda.get_device_name(torch.cuda.current_device())) print("Using device:", device) # ============================================================ # AIO version (Space variable) # ============================================================ AIO_REPO_ID = "prithivMLmods/Qwen-Image-Edit-Rapid-AIO-V19" DEFAULT_AIO_VERSION = "v19" _VER_RE = re.compile(r"^v\d+$") _DIGITS_RE = re.compile(r"^\d+$") def _normalize_version(raw: str) -> Optional[str]: if raw is None: return None s = str(raw).strip() if not s: return None if _VER_RE.fullmatch(s): return s # forgiving: allow "21" -> "v21" if _DIGITS_RE.fullmatch(s): return f"v{s}" return None _AIO_ENV_RAW = os.environ.get("AIO_VERSION", "") _AIO_ENV_NORM = _normalize_version(_AIO_ENV_RAW) AIO_VERSION = _AIO_ENV_NORM or DEFAULT_AIO_VERSION AIO_VERSION_SOURCE = "env" if _AIO_ENV_NORM else "default(v19)" print(f"AIO_VERSION (env raw) = {_AIO_ENV_RAW!r}") print(f"AIO_VERSION (normalized) = {_AIO_ENV_NORM!r}") print(f"Using AIO_VERSION = {AIO_VERSION} ({AIO_VERSION_SOURCE})") # ============================================================ # Pipeline # ============================================================ from diffusers import FlowMatchEulerDiscreteScheduler # noqa: F401 from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 pipe = None def _clear_model_memory(): gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def _load_pipe_with_version(version: str) -> QwenImageEditPlusPipeline: print(f"πŸ“¦ Loading AIO transformer from: {AIO_REPO_ID}") try: transformer = QwenImageTransformer2DModel.from_pretrained( AIO_REPO_ID, torch_dtype=dtype, device_map="cuda", ) print("βœ… Successfully loaded the Rapid AIO transformer!") except Exception as e: print(f"❌ Failed to load transformer: {e}") raise # Stop here so we see the clear error p = QwenImageEditPlusPipeline.from_pretrained( "Qwen/Qwen-Image-Edit-2511", transformer=transformer, torch_dtype=dtype, ).to(device) return p # Forgiving load: try env/default version, fallback to v19 if it fails try: pipe = _load_pipe_with_version(AIO_VERSION) except Exception as e: print("❌ Failed to load requested AIO_VERSION. Falling back to v19.") print("---- exception ----") print(traceback.format_exc()) print("-------------------") AIO_VERSION = DEFAULT_AIO_VERSION AIO_VERSION_SOURCE = "fallback_to_v19" pipe = _load_pipe_with_version(AIO_VERSION) # Apply FA3 Optimization try: pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) print("Flash Attention 3 Processor set successfully.") except Exception as e: print(f"Warning: Could not set FA3 processor: {e}") def _release_edit_pipe(): global pipe if pipe is None: return try: pipe = None LOADED_ADAPTERS.clear() finally: _clear_model_memory() def _get_edit_pipe() -> QwenImageEditPlusPipeline: global pipe, _i2v_pipe, AIO_VERSION, AIO_VERSION_SOURCE if pipe is not None: return pipe if _i2v_pipe is not None: _i2v_pipe = None _clear_model_memory() try: pipe = _load_pipe_with_version(AIO_VERSION) except Exception: print("❌ Failed to load requested AIO_VERSION. Falling back to v19.") print("---- exception ----") print(traceback.format_exc()) print("-------------------") AIO_VERSION = DEFAULT_AIO_VERSION AIO_VERSION_SOURCE = "fallback_to_v19" pipe = _load_pipe_with_version(AIO_VERSION) try: pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) print("Flash Attention 3 Processor set successfully.") except Exception as e: print(f"Warning: Could not set FA3 processor: {e}") return pipe MAX_SEED = np.iinfo(np.int32).max # ============================================================ # Wan I2V pipeline (lazy-loaded on first call) # ============================================================ WAN_MODEL_ID = os.environ.get("WAN_MODEL_ID", "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers").strip() _i2v_pipe = None _i2v_last_load_seconds = None _i2v_prewarm_status = "Not loaded" _i2v_prewarm_duration = int(os.environ.get("I2V_PREWARM_DURATION", "240")) _i2v_max_duration = int(os.environ.get("I2V_MAX_GPU_DURATION", "600")) _i2v_cold_start_buffer = int(os.environ.get("I2V_COLD_START_BUFFER", "220")) def get_i2v_duration(image, prompt, steps, guidance_scale, num_frames, fps, seed, randomize_seed): """ Estimate GPU lease duration for ZeroGPU. Cold starts need significantly longer due to model load. """ try: s = int(steps) except Exception: s = 8 try: f = int(num_frames) except Exception: f = 25 base = 45 + (s * 6) + int(f * 1.8) if _i2v_pipe is None: base += _i2v_cold_start_buffer if float(guidance_scale) > 3.0: base += 25 return max(120, min(base, _i2v_max_duration)) def _get_i2v_pipe() -> WanImageToVideoPipeline: global _i2v_pipe, _i2v_last_load_seconds if _i2v_pipe is None: load_start = time.perf_counter() _release_edit_pipe() print(f"πŸ“¦ Loading Wan I2V pipeline from: {WAN_MODEL_ID}") _i2v_pipe = WanImageToVideoPipeline.from_pretrained( WAN_MODEL_ID, torch_dtype=torch.bfloat16, ).to(device) _i2v_last_load_seconds = time.perf_counter() - load_start print(f"βœ… Wan I2V pipeline loaded in {_i2v_last_load_seconds:.2f}s.") else: print("ℹ️ Wan I2V pipeline already loaded; reusing in-memory model.") return _i2v_pipe @spaces.GPU(duration=_i2v_prewarm_duration) def prewarm_i2v_model(): global _i2v_prewarm_status start = time.perf_counter() try: _get_i2v_pipe() elapsed = time.perf_counter() - start message = f"I2V model ready. Prewarm call took {elapsed:.2f}s." _i2v_prewarm_status = message print(f"[TIMING][i2v] {message}") return message except Exception as e: elapsed = time.perf_counter() - start message = f"Prewarm failed after {elapsed:.2f}s: {type(e).__name__}: {e}" _i2v_prewarm_status = message print(f"[TIMING][i2v] {message}") return message def get_i2v_prewarm_status(): return _i2v_prewarm_status def _auto_prewarm_i2v_if_enabled(): global _i2v_prewarm_status enabled = os.environ.get("I2V_AUTOPREWARM_ON_START", "").strip().lower() in {"1", "true", "yes", "on"} if not enabled: _i2v_prewarm_status = "Startup auto-prewarm is disabled" print("[TIMING][i2v] auto_prewarm_on_start=disabled") return print("[TIMING][i2v] auto_prewarm_on_start=enabled") try: start = time.perf_counter() _get_i2v_pipe() elapsed = time.perf_counter() - start msg = f"I2V model ready from startup auto-prewarm in {elapsed:.2f}s." _i2v_prewarm_status = msg print(f"[TIMING][i2v] auto_prewarm_success: {msg}") except Exception as e: _i2v_prewarm_status = f"Startup auto-prewarm failed: {e}" print(f"[TIMING][i2v] auto_prewarm_failed: {e}") @spaces.GPU(duration=get_i2v_duration) def generate_i2v(image, prompt, steps, guidance_scale, num_frames, fps, seed, randomize_seed): request_start = time.perf_counter() requested_duration = get_i2v_duration(image, prompt, steps, guidance_scale, num_frames, fps, seed, randomize_seed) print( "[TIMING][i2v] request_started " f"duration_budget={requested_duration}s " f"steps={int(steps)} guidance={float(guidance_scale):.2f} " f"frames={int(num_frames)} fps={int(fps)}" ) if image is None: return None, seed, "Failed: please provide an input image." if randomize_seed: seed = random.randint(0, MAX_SEED) load_stage_start = time.perf_counter() i2v = _get_i2v_pipe() load_stage_seconds = time.perf_counter() - load_stage_start print(f"[TIMING][i2v] load_or_reuse_seconds={load_stage_seconds:.2f}") generator = torch.Generator(device=device).manual_seed(int(seed)) if isinstance(image, np.ndarray): image = Image.fromarray(image).convert("RGB") elif isinstance(image, Image.Image): image = image.convert("RGB") try: infer_start = time.perf_counter() output = i2v( image=image, prompt=prompt, num_inference_steps=int(steps), guidance_scale=float(guidance_scale), num_frames=int(num_frames), generator=generator, height=480, width=720, ).frames[0] infer_seconds = time.perf_counter() - infer_start print(f"[TIMING][i2v] inference_seconds={infer_seconds:.2f}") export_start = time.perf_counter() with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: export_to_video(output, tmp.name, fps=int(fps)) export_seconds = time.perf_counter() - export_start total_seconds = time.perf_counter() - request_start print( "[TIMING][i2v] " f"export_seconds={export_seconds:.2f} total_seconds={total_seconds:.2f}" ) return tmp.name, seed, ( f"Success. load/reuse={load_stage_seconds:.2f}s, " f"infer={infer_seconds:.2f}s, export={export_seconds:.2f}s, total={total_seconds:.2f}s" ) except Exception as e: total_seconds = time.perf_counter() - request_start print(f"[TIMING][i2v] failed_after_seconds={total_seconds:.2f}") return None, seed, f"Failed after {total_seconds:.2f}s: {type(e).__name__}: {e}" # ============================================================ # VAE tiling toggle (UI-controlled; OFF by default) # ============================================================ def _apply_vae_tiling(enabled: bool): """ Toggle VAE tiling on the global pipeline. This does NOT require a Space restart; it applies to the next pipe(...) call. Note: this is global process state, so concurrent users could flip it between runs. """ try: edit_pipe = _get_edit_pipe() if enabled: if hasattr(edit_pipe, "enable_vae_tiling"): edit_pipe.enable_vae_tiling() print("βœ… VAE tiling ENABLED (per UI).") elif hasattr(edit_pipe, "vae") and hasattr(edit_pipe.vae, "enable_tiling"): edit_pipe.vae.enable_tiling() print("βœ… VAE tiling ENABLED via edit_pipe.vae.enable_tiling() (per UI).") else: print("⚠️ No enable_vae_tiling()/vae.enable_tiling() found; cannot enable.") else: if hasattr(edit_pipe, "disable_vae_tiling"): edit_pipe.disable_vae_tiling() print("πŸ›‘ VAE tiling DISABLED (per UI).") elif hasattr(edit_pipe, "vae") and hasattr(edit_pipe.vae, "disable_tiling"): edit_pipe.vae.disable_tiling() print("πŸ›‘ VAE tiling DISABLED via edit_pipe.vae.disable_tiling() (per UI).") else: print("⚠️ No disable_vae_tiling()/vae.disable_tiling() found; leaving current state unchanged.") except Exception as e: print(f"⚠️ VAE tiling toggle failed: {e}") # ============================================================ # Derived conditioning (Transformers): Depth # ============================================================ # Depth uses Depth Anything V2 Small (Transformers-compatible): # https://huggingface.co/depth-anything/Depth-Anything-V2-Small-hf DEPTH_MODEL_ID = "depth-anything/Depth-Anything-V2-Small-hf" # Lazy cache keyed by device string ("cpu" / "cuda") _DEPTH_CACHE = {} def _derived_device(use_gpu: bool) -> torch.device: return torch.device("cuda" if (use_gpu and torch.cuda.is_available()) else "cpu") def _load_depth_models(dev: torch.device): key = str(dev) if key in _DEPTH_CACHE: return _DEPTH_CACHE[key] proc = AutoImageProcessor.from_pretrained(DEPTH_MODEL_ID) model = AutoModelForDepthEstimation.from_pretrained(DEPTH_MODEL_ID).to(dev) model.eval() _DEPTH_CACHE[key] = (proc, model) return _DEPTH_CACHE[key] @torch.inference_mode() def make_depth_map(img: Image.Image, *, use_gpu: bool) -> Image.Image: dev = _derived_device(use_gpu) proc, model = _load_depth_models(dev) w, h = img.size inputs = proc(images=img.convert("RGB"), return_tensors="pt").to(dev) outputs = model(**inputs) predicted = outputs.predicted_depth # [B, H, W] depth = torch.nn.functional.interpolate( predicted.unsqueeze(1), size=(h, w), mode="bicubic", align_corners=False, ).squeeze(1)[0] depth = depth - depth.min() depth = depth / (depth.max() + 1e-8) depth = (depth * 255.0).clamp(0, 255).to(torch.uint8).cpu().numpy() return Image.fromarray(depth).convert("RGB") # ============================================================ # LoRA adapters + presets # ============================================================ NONE_LORA = "None" ADAPTER_SPECS = { "Consistance": { "type": "single", "repo": "Pr0f3ssi0n4ln00b/QIE_2511_Consistency_Lora", "weights": "qe2511_consis_alpha_patched.safetensors", "adapter_name": "Consistency", "strength": 0.6, }, "Semirealistic-photo-detailer": { "type": "single", "repo": "rzgar/Qwen-Image-Edit-semi-realistic-detailer", "weights": "Qwen-Image-Edit-Anime-Semi-Realistic-Detailer-v1.safetensors", "adapter_name": "semirealistic", "strength": 1.0, }, "AnyPose": { "type": "package", "requires_two_images": True, "image2_label": "Upload Pose Reference (Image 2)", "parts": [ { "repo": "lilylilith/AnyPose", "weights": "2511-AnyPose-base-000006250.safetensors", "adapter_name": "anypose-base", "strength": 0.7, }, { "repo": "lilylilith/AnyPose", "weights": "2511-AnyPose-helper-00006000.safetensors", "adapter_name": "anypose-helper", "strength": 0.7, }, ], }, "Any2Real_2601": { "type": "single", "repo": "lrzjason/Anything2Real_2601", "weights": "anything2real_2601_A_final_patched.safetensors", "adapter_name": "photoreal", "strength": 1.0, }, "Hyperrealistic-Portrait": { "type": "single", "repo": "prithivMLmods/Qwen-Image-Edit-2511-Hyper-Realistic-Portrait", "weights": "HRP_20.safetensors", "adapter_name": "HRPortrait", "strength": 1.0, }, "Ultrarealistic-Portrait": { "type": "single", "repo": "prithivMLmods/Qwen-Image-Edit-2511-Ultra-Realistic-Portrait", "weights": "URP_20.safetensors", "adapter_name": "URPortrait", "strength": 1.0, }, "BFS-Best-FaceSwap": { "type": "single", "requires_two_images": True, "image2_label": "Upload Head/Face Donor (Image 2)", "repo": "Alissonerdx/BFS-Best-Face-Swap", "weights": "bfs_head_v5_2511_original.safetensors", "adapter_name": "BFS-Best-Faceswap", "strength": 1.0, "needs_alpha_fix": True, }, "BFS-Best-FaceSwap-merge": { "type": "single", "requires_two_images": True, "image2_label": "Upload Head/Face Donor (Image 2)", "repo": "Alissonerdx/BFS-Best-Face-Swap", "weights": "bfs_head_v5_2511_merged_version_rank_32_fp32.safetensors", "adapter_name": "BFS-Best-Faceswap-merge", "strength": 1.1, "needs_alpha_fix": True, }, "F2P": { "type": "single", "repo": "DiffSynth-Studio/Qwen-Image-Edit-F2P", "weights": "edit_0928_lora_step40000.safetensors", "adapter_name": "F2P", "strength": 1.0, }, "Multiple-Angles": { "type": "single", "repo": "dx8152/Qwen-Edit-2509-Multiple-angles", "weights": "ι•œε€΄θ½¬ζ’.safetensors", "adapter_name": "multiple-angles", "strength": 1.0, }, "Light-Restoration": { "type": "single", "repo": "dx8152/Qwen-Image-Edit-2509-Light_restoration", "weights": "移陀光影.safetensors", "adapter_name": "light-restoration", "strength": 1.0, }, "Relight": { "type": "single", "repo": "dx8152/Qwen-Image-Edit-2509-Relight", "weights": "Qwen-Edit-Relight.safetensors", "adapter_name": "relight", "strength": 1.0, }, "Multi-Angle-Lighting": { "type": "single", "repo": "dx8152/Qwen-Edit-2509-Multi-Angle-Lighting", "weights": "ε€šθ§’εΊ¦η―ε…‰-251116.safetensors", "adapter_name": "multi-angle-lighting", "strength": 1.0, }, "Edit-Skin": { "type": "single", "repo": "tlennon-ie/qwen-edit-skin", "weights": "qwen-edit-skin_1.1_000002750.safetensors", "adapter_name": "edit-skin", "strength": 1.0, }, "Next-Scene": { "type": "single", "repo": "lovis93/next-scene-qwen-image-lora-2509", "weights": "next-scene_lora-v2-3000.safetensors", "adapter_name": "next-scene", "strength": 1.0, }, "Flat-Log": { "type": "single", "repo": "tlennon-ie/QwenEdit2509-FlatLogColor", "weights": "QwenEdit2509-FlatLogColor.safetensors", "adapter_name": "flat-log", "strength": 1.0, }, "Upscale-Image": { "type": "single", "repo": "vafipas663/Qwen-Edit-2509-Upscale-LoRA", "weights": "qwen-edit-enhance_64-v3_000001000.safetensors", "adapter_name": "upscale-image", "strength": 1.0, }, "Upscale2K": { "type": "single", "repo": "valiantcat/Qwen-Image-Edit-2509-Upscale2K", "weights": "qwen_image_edit_2509_upscale.safetensors", "adapter_name": "upscale-2k", "strength": 1.0, "target_long_edge": 2048, }, } LORA_PRESET_PROMPTS = { "Any2Real_2601": "change the picture 1 to realistic photograph", "Semirealistic-photo-detailer": "transform the image to semi-realistic image", "AnyPose": "Make the person in image 1 do the exact same pose of the person in image 2. Changing the style and background of the image of the person in image 1 is undesirable, so don't do it. The new pose should be pixel accurate to the pose we are trying to copy. The position of the arms and head and legs should be the same as the pose we are trying to copy. Change the field of view and angle to match exactly image 2. Head tilt and eye gaze pose should match the person in image 2.", "Hyperrealistic-Portrait": "Transform the image into an ultra-realistic photorealistic portrait with strict identity preservation, facing straight to the camera. Enhance pore-level skin textures, realistic moisture effects, and natural wet hair clumping against the skin. Apply cool-toned soft-box lighting with subtle highlights and shadows, maintain realistic green-hazel eye catchlights without synthetic gloss, and preserve soft natural lip texture. Use shallow depth of field with a clean bokeh background, an 85mm macro photographic look, and raw photo grading without retouching to maintain realism and original details.", "Ultrarealistic-Portrait": "Transform the image into an ultra-realistic glamour portrait while strictly preserving the subject's identity. Apply a close-up composition with a slight head tilt and a hand near the face, enhance cinematic directional lighting with dramatic fashion-style highlights, and refine makeup details including glowing skin, glossy lips, luminous highlighter, and defined eyes. Increase skin realism with detailed epidermal textures such as micropores, microhairs, subtle oil sheen, natural highlights, soft wrinkles, and subsurface scattering. Maintain a luxury fashion-magazine look in a 9:16 aspect ratio, preserving realism, facial structure, and original details without over-smoothing or retouching.", "Upscale2K": "Upscale this picture to 4K resolution.", "BFS-Best-FaceSwap": "head_swap: start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k", "BFS-Best-FaceSwap-merge": "head_swap: start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k", } # Track what is currently loaded in memory (adapter_name values) LOADED_ADAPTERS = set() # ============================================================ # Helpers: resolution # ============================================================ def _round_to_multiple(x: int, m: int) -> int: return max(m, (int(x) // m) * m) def compute_canvas_dimensions_from_area( image: Image.Image, target_area: int, multiple_of: int, ) -> tuple[int, int]: """Compute (width, height) that matches image aspect ratio and approximates target_area.""" w, h = image.size aspect = w / h if h else 1.0 from qwenimage.pipeline_qwenimage_edit_plus import calculate_dimensions width, height = calculate_dimensions(int(target_area), float(aspect)) width = _round_to_multiple(int(width), int(multiple_of)) height = _round_to_multiple(int(height), int(multiple_of)) return width, height def get_target_area_for_lora( image: Image.Image, lora_adapter: str, user_target_megapixels: float, ) -> int: """Return target pixel area for the canvas.""" spec = ADAPTER_SPECS.get(lora_adapter, {}) if "target_area" in spec: try: return int(spec["target_area"]) except Exception: pass # Legacy support (e.g. Upscale2K) if "target_long_edge" in spec: try: long_edge = int(spec["target_long_edge"]) w, h = image.size if w >= h: new_w = long_edge new_h = int(round(long_edge * (h / w))) else: new_h = long_edge new_w = int(round(long_edge * (w / h))) return int(new_w * new_h) except Exception: pass try: mp = float(user_target_megapixels) except Exception: mp = 1.0 # Treat 0 MP as "match input area" if mp <= 0: w, h = image.size return int(w * h) return int(mp * 1024 * 1024) # ============================================================ # Helpers: multi-input routing + gallery normalization # ============================================================ def lora_requires_two_images(lora_adapter: str) -> bool: return bool(ADAPTER_SPECS.get(lora_adapter, {}).get("requires_two_images", False)) def image2_label_for_lora(lora_adapter: str) -> str: return str(ADAPTER_SPECS.get(lora_adapter, {}).get("image2_label", "Upload Reference (Image 2)")) def _to_pil_rgb(x) -> Optional[Image.Image]: """ Accepts PIL / numpy / (image, caption) tuples from gr.Gallery and returns PIL RGB. """ if x is None: return None # Gallery often returns (image, caption) if isinstance(x, tuple) and len(x) >= 1: x = x[0] if x is None: return None if isinstance(x, Image.Image): return x.convert("RGB") if isinstance(x, np.ndarray): return Image.fromarray(x).convert("RGB") try: return Image.fromarray(np.array(x)).convert("RGB") except Exception: return None def build_labeled_images( img1: Image.Image, img2: Optional[Image.Image], extra_imgs: Optional[list[Image.Image]], ) -> dict[str, Image.Image]: """ Creates labels image_1, image_2, image_3... based on what is actually uploaded. """ labeled: dict[str, Image.Image] = {} idx = 1 labeled[f"image_{idx}"] = img1 idx += 1 if img2 is not None: labeled[f"image_{idx}"] = img2 idx += 1 if extra_imgs: for im in extra_imgs: if im is None: continue labeled[f"image_{idx}"] = im idx += 1 return labeled def _append_to_gallery(existing, new_img) -> list[Image.Image]: """Append new_img to an existing gallery list.""" items: list[Image.Image] = [] if existing: for item in existing: pil = _to_pil_rgb(item) if pil is not None: items.append(pil) if new_img is not None: pil = _to_pil_rgb(new_img) if pil is not None: items.append(pil) return items # ============================================================ # Helpers: BFS alpha key fix # ============================================================ def _inject_missing_alpha_keys(state_dict: dict) -> dict: """ Diffusers' Qwen LoRA converter expects '.alpha' keys. BFS safetensors omits them. We inject alpha = rank (neutral scaling). """ bases = {} for k, v in state_dict.items(): if not isinstance(v, torch.Tensor): continue if k.endswith(".lora_down.weight") and v.ndim >= 1: base = k[: -len(".lora_down.weight")] rank = int(v.shape[0]) bases[base] = rank for base, rank in bases.items(): alpha_tensor = torch.tensor(float(rank), dtype=torch.float32) full_alpha = f"{base}.alpha" if full_alpha not in state_dict: state_dict[full_alpha] = alpha_tensor if base.startswith("diffusion_model."): stripped_base = base[len("diffusion_model."):] stripped_alpha = f"{stripped_base}.alpha" if stripped_alpha not in state_dict: state_dict[stripped_alpha] = alpha_tensor return state_dict def _filter_to_diffusers_lora_keys(state_dict: dict) -> tuple[dict, dict]: """Return (filtered_state_dict, stats). Keeps only LoRA keys Diffusers can consume.""" keep_suffixes = ( ".lora_up.weight", ".lora_down.weight", ".lora_mid.weight", ".alpha", ".lora_alpha", ) dropped_patch = 0 dropped_other = 0 kept = 0 normalized_alpha = 0 out: dict[str, torch.Tensor] = {} for k, v in state_dict.items(): if not isinstance(v, torch.Tensor): dropped_other += 1 continue if k.endswith(".diff") or k.endswith(".diff_b"): dropped_patch += 1 continue if not k.endswith(keep_suffixes): dropped_other += 1 continue if k.endswith(".lora_alpha"): base = k[: -len(".lora_alpha")] k2 = f"{base}.alpha" out[k2] = v.float() if v.dtype != torch.float32 else v normalized_alpha += 1 kept += 1 continue out[k] = v kept += 1 stats = { "kept": kept, "dropped_patch": dropped_patch, "dropped_other": dropped_other, "normalized_alpha": normalized_alpha, } return out, stats def _duplicate_stripped_prefix_keys(state_dict: dict, prefix: str = "diffusion_model.") -> dict: """Ensure both prefixed and unprefixed variants exist for LoRA-related keys.""" out = dict(state_dict) for k, v in list(state_dict.items()): if not k.startswith(prefix): continue stripped = k[len(prefix):] if stripped not in out: out[stripped] = v return out def _load_lora_weights_with_fallback(repo: str, weight_name: str, adapter_name: str, needs_alpha_fix: bool = False): """ Normal path: pipe.load_lora_weights(repo, weight_name=..., adapter_name=...) BFS fallback: download safetensors, inject missing alpha keys, then load from dict. """ edit_pipe = _get_edit_pipe() try: edit_pipe.load_lora_weights(repo, weight_name=weight_name, adapter_name=adapter_name) return except (KeyError, ValueError) as e: if not needs_alpha_fix: raise print( "⚠️ LoRA load failed (will try safe dict fallback). " f"Adapter={adapter_name!r} file={weight_name!r} error={type(e).__name__}: {e}" ) local_path = hf_hub_download(repo_id=repo, filename=weight_name) sd = safetensors_load_file(local_path) sd = _inject_missing_alpha_keys(sd) sd, stats = _filter_to_diffusers_lora_keys(sd) sd = _duplicate_stripped_prefix_keys(sd) print( "🧹 LoRA dict cleanup stats: " f"kept={stats['kept']} dropped_patch={stats['dropped_patch']} " f"dropped_other={stats['dropped_other']} normalized_alpha={stats['normalized_alpha']}" ) edit_pipe.load_lora_weights(sd, adapter_name=adapter_name) return # ============================================================ # LoRA loader: single/package + strengths # ============================================================ def _ensure_loaded_and_get_active_adapters(selected_lora: str): spec = ADAPTER_SPECS.get(selected_lora) if not spec: raise gr.Error(f"Configuration not found for: {selected_lora}") adapter_names = [] adapter_weights = [] if spec.get("type") == "package": parts = spec.get("parts", []) if not parts: raise gr.Error(f"Package spec has no parts: {selected_lora}") for part in parts: repo = part["repo"] weights = part["weights"] adapter_name = part["adapter_name"] strength = float(part.get("strength", 1.0)) needs_alpha_fix = bool(part.get("needs_alpha_fix", False)) if adapter_name not in LOADED_ADAPTERS: print(f"--- Downloading and Loading Adapter Part: {selected_lora} / {adapter_name} ---") try: _load_lora_weights_with_fallback( repo=repo, weight_name=weights, adapter_name=adapter_name, needs_alpha_fix=needs_alpha_fix, ) LOADED_ADAPTERS.add(adapter_name) except Exception as e: raise gr.Error(f"Failed to load adapter part {selected_lora}/{adapter_name}: {e}") else: print(f"--- Adapter part already loaded: {selected_lora} / {adapter_name} ---") adapter_names.append(adapter_name) adapter_weights.append(strength) else: repo = spec["repo"] weights = spec["weights"] adapter_name = spec["adapter_name"] strength = float(spec.get("strength", 1.0)) needs_alpha_fix = bool(spec.get("needs_alpha_fix", False)) if adapter_name not in LOADED_ADAPTERS: print(f"--- Downloading and Loading Adapter: {selected_lora} ---") try: _load_lora_weights_with_fallback( repo=repo, weight_name=weights, adapter_name=adapter_name, needs_alpha_fix=needs_alpha_fix, ) LOADED_ADAPTERS.add(adapter_name) except Exception as e: raise gr.Error(f"Failed to load adapter {selected_lora}: {e}") else: print(f"--- Adapter {selected_lora} is already loaded. ---") adapter_names = [adapter_name] adapter_weights = [strength] return adapter_names, adapter_weights # ============================================================ # UI handlers # ============================================================ def on_lora_change_ui(selected_lora, current_prompt, current_extras_condition_only): # Preset prompt (fill only if empty) if selected_lora != NONE_LORA: preset = LORA_PRESET_PROMPTS.get(selected_lora, "") if preset and (current_prompt is None or str(current_prompt).strip() == ""): prompt_update = gr.update(value=preset) else: prompt_update = gr.update(value=current_prompt) else: prompt_update = gr.update(value=current_prompt) # Image2 visibility/label if lora_requires_two_images(selected_lora): img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora)) else: img2_update = gr.update(visible=False, value=None, label="Upload Reference (Image 2)") # Extra references routing default if selected_lora in ("BFS-Best-FaceSwap", "BFS-Best-FaceSwap-merge", "AnyPose"): extras_update = gr.update(value=True) else: extras_update = gr.update(value=current_extras_condition_only) return prompt_update, img2_update, extras_update def set_output_as_image1(last): if last is None: raise gr.Error("No output available yet.") return gr.update(value=last) def set_output_as_image2(last): if last is None: raise gr.Error("No output available yet.") return gr.update(value=last) def set_output_as_extra(last, existing_extra): if last is None: raise gr.Error("No output available yet.") return _append_to_gallery(existing_extra, last) @spaces.GPU def add_derived_ref(img1, existing_extra, derived_type, derived_use_gpu): if img1 is None: raise gr.Error("Please upload Image 1 first.") if derived_type == "None": return gr.update(value=existing_extra), gr.update(visible=False, value=None) base = img1.convert("RGB") if derived_type == "Depth (Depth Anything V2 Small)": derived = make_depth_map(base, use_gpu=bool(derived_use_gpu)) else: raise gr.Error(f"Unknown derived type: {derived_type}") new_gallery = _append_to_gallery(existing_extra, derived) return gr.update(value=new_gallery), gr.update(visible=True, value=derived) # ============================================================ # Inference # ============================================================ @spaces.GPU def infer( input_image_1, input_image_2, input_images_extra, prompt, lora_adapter, seed, randomize_seed, guidance_scale, steps, target_megapixels, extras_condition_only, pad_to_canvas, vae_tiling, resolution_multiple, vae_ref_megapixels, decoder_vae, keep_decoder_2x, progress=gr.Progress(track_tqdm=True), ): gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() if input_image_1 is None: raise gr.Error("Please upload Image 1.") edit_pipe = _get_edit_pipe() # Handle "None" if lora_adapter == NONE_LORA: try: edit_pipe.set_adapters([], adapter_weights=[]) except Exception: if LOADED_ADAPTERS: edit_pipe.set_adapters(list(LOADED_ADAPTERS), adapter_weights=[0.0] * len(LOADED_ADAPTERS)) else: adapter_names, adapter_weights = _ensure_loaded_and_get_active_adapters(lora_adapter) edit_pipe.set_adapters(adapter_names, adapter_weights=adapter_weights) if randomize_seed: seed = random.randint(0, MAX_SEED) generator = torch.Generator(device=device).manual_seed(seed) negative_prompt = ( "worst quality, low quality, bad anatomy, bad hands, text, error, missing fingers, " "extra digit, fewer digits, cropped, jpeg artifacts, signature, watermark, username, blurry" ) img1 = input_image_1.convert("RGB") img2 = input_image_2.convert("RGB") if input_image_2 is not None else None # Normalize extra images (Gallery) to PIL RGB extra_imgs: list[Image.Image] = [] if input_images_extra: for item in input_images_extra: pil = _to_pil_rgb(item) if pil is not None: extra_imgs.append(pil) if lora_requires_two_images(lora_adapter) and img2 is None: raise gr.Error("This LoRA needs two images. Please upload Image 2 as well.") labeled = build_labeled_images(img1, img2, extra_imgs) pipe_images = list(labeled.values()) if len(pipe_images) == 1: pipe_images = pipe_images[0] target_area = get_target_area_for_lora(img1, lora_adapter, float(target_megapixels)) width, height = compute_canvas_dimensions_from_area( img1, target_area=target_area, multiple_of=int(resolution_multiple), ) vae_image_indices = None if extras_condition_only: if isinstance(pipe_images, list) and len(pipe_images) > 2: vae_image_indices = [0, 1] if len(pipe_images) >= 2 else [0] try: print( "[DEBUG][infer] submitting request | " f"lora_adapter={lora_adapter!r} seed={seed} prompt={prompt!r}" ) print(f"[DEBUG][infer] canvas={width}x{height} (~{(width*height)/1_048_576:.3f} MP) vae_tiling={bool(vae_tiling)}") res_mult = int(resolution_multiple) if resolution_multiple is not None else int(edit_pipe.vae_scale_factor * 2) try: mp_ref = float(vae_ref_megapixels) except Exception: mp_ref = 0.0 vae_ref_area = int(mp_ref * 1024 * 1024) if mp_ref and mp_ref > 0 else None base_ref_count = 2 if img2 is not None else 1 _apply_vae_tiling(bool(vae_tiling)) result = edit_pipe( image=pipe_images, prompt=prompt, negative_prompt=negative_prompt, height=height, width=width, num_inference_steps=steps, generator=generator, true_cfg_scale=guidance_scale, vae_image_indices=vae_image_indices, pad_to_canvas=bool(pad_to_canvas), resolution_multiple=res_mult, vae_ref_area=vae_ref_area, vae_ref_start_index=base_ref_count, decoder_vae=str(decoder_vae).lower(), keep_decoder_2x=bool(keep_decoder_2x), ).images[0] return result, seed, result finally: gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() @spaces.GPU def infer_example(input_image, prompt, lora_adapter): if input_image is None: return None, 0, None input_pil = input_image.convert("RGB") result, seed, last = infer( input_pil, None, None, prompt, lora_adapter, 0, True, 1.0, # guidance_scale 4, # steps 1.0, # target_megapixels True, # extras_condition_only True, # pad_to_canvas False, # vae_tiling 32, # resolution_multiple 0.0, # vae_ref_megapixels "qwen", # decoder_vae False, # keep_decoder_2x ) return result, seed, last # ============================================================ # UI # ============================================================ css = """ #col-container { margin: 0 auto; max-width: 1100px; } #main-title h1 {font-size: 2.1em !important;} """ aio_status_line = ( f"**AIO transformer version:** `{AIO_VERSION}` " f"({AIO_VERSION_SOURCE}; env `AIO_VERSION`={_AIO_ENV_RAW!r})" ) with gr.Blocks() as demo: with gr.Column(elem_id="col-container"): gr.Markdown("# **Qwen-Image-Edit + Wan I2V**", elem_id="main-title") gr.Markdown( "πŸ–ΌοΈ **Image Edit** tab: Qwen-Image-Edit-2511 with LoRAs and advanced controls\n\n" "πŸŽ₯ **Image to Video** tab: Wan I2V β€” generate video from your edited image" ) gr.Markdown(aio_status_line) with gr.Tabs(): # ===================== IMAGE EDIT TAB ===================== with gr.TabItem("πŸ–ΌοΈ Image Edit"): with gr.Row(equal_height=True): with gr.Column(): input_image_1 = gr.Image(label="Upload Image 1 (Base / Target)", type="pil", height=290) input_image_2 = gr.Image(label="Upload Reference (Image 2)", type="pil", height=290, visible=False) input_images_extra = gr.Gallery( label="Upload Additional Images (auto-indexed after Image 1/2)", type="pil", height=290, columns=4, rows=2, interactive=True, ) prompt = gr.Text( label="Edit Prompt", show_label=True, placeholder="e.g., transform into photo..", ) run_button = gr.Button("Edit Image", variant="primary") with gr.Column(): output_image = gr.Image(label="Output Image", interactive=False, format="png", height=353) last_output = gr.State(value=None) with gr.Row(): btn_out_to_img1 = gr.Button("⬅️ Output β†’ Image 1", variant="secondary") btn_out_to_img2 = gr.Button("⬅️ Output β†’ Image 2", variant="secondary") btn_out_to_extra = gr.Button("βž• Output β†’ Extra Ref", variant="secondary") derived_preview = gr.Image( label="Derived Conditioning Preview", interactive=False, format="png", height=200, visible=False, ) with gr.Row(): lora_choices = [NONE_LORA] + list(ADAPTER_SPECS.keys()) lora_adapter = gr.Dropdown( label="Choose Editing Style", choices=lora_choices, value=NONE_LORA, ) with gr.Accordion("Advanced Settings", open=False, visible=True): with gr.Accordion("Derived Conditioning (Pose / Depth)", open=False): derived_type = gr.Dropdown( label="Derived Type (from Image 1)", choices=["None", "Depth (Depth Anything V2 Small)"], value="None", ) derived_use_gpu = gr.Checkbox(label="Use GPU for derived model", value=False) add_derived_btn = gr.Button("βž• Add derived ref to Extras (conditioning-only recommended)") seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) guidance_scale = gr.Slider(label="Guidance Scale", minimum=1.0, maximum=10.0, step=0.1, value=1.0) steps = gr.Slider(label="Inference Steps", minimum=1, maximum=50, step=1, value=4) target_megapixels = gr.Slider( label="Target Megapixels (canvas, 0 = match input area)", minimum=0.0, maximum=6.0, step=0.1, value=1.0, ) resolution_multiple = gr.Dropdown( label="Resolution lattice multiple (anti-drift)", choices=[32, 56, 112], value=32, interactive=True, ) vae_ref_megapixels = gr.Slider( label="Extra refs VAE megapixels override (0 = use canvas)", minimum=0.0, maximum=6.0, step=0.1, value=0.0, ) decoder_vae = gr.Dropdown( label="Decoder VAE", choices=["qwen", "wan2x"], value="qwen", interactive=True, ) keep_decoder_2x = gr.Checkbox( label="Keep 2Γ— output (wan2x only)", value=False, ) extras_condition_only = gr.Checkbox( label="Extra references are conditioning-only (exclude from VAE)", value=True, ) pad_to_canvas = gr.Checkbox( label="Pad images to canvas aspect (avoid warping)", value=True, ) vae_tiling = gr.Checkbox( label="VAE tiling (lower VRAM, slower)", value=False, ) # On LoRA selection: preset prompt + toggle Image 2 lora_adapter.change( fn=on_lora_change_ui, inputs=[lora_adapter, prompt, extras_condition_only], outputs=[prompt, input_image_2, extras_condition_only], ) gr.Examples( examples=[ ["examples/5.jpg", "Remove shadows and relight the image using soft lighting.", "Light-Restoration"], ["examples/4.jpg", "Use a subtle golden-hour filter with smooth light diffusion.", "Relight"], ["examples/2.jpeg", "Rotate the camera 45 degrees to the left.", "Multiple-Angles"], [ "examples/12.jpg", "flatcolor Desaturate the image and lower the contrast to create a flat, ungraded look similar to a camera log profile. Preserve details in the highlights and shadows.", "Flat-Log", ], ["examples/7.jpg", "Light source from the Right Rear", "Multi-Angle-Lighting"], ["examples/10.jpeg", "Upscale the image.", "Upscale-Image"], ["examples/7.jpg", "Light source from the Below", "Multi-Angle-Lighting"], ["examples/2.jpeg", "Switch the camera to a top-down right corner view.", "Multiple-Angles"], [ "examples/9.jpg", "The camera moves slightly forward as sunlight breaks through the clouds, casting a soft glow around the character's silhouette in the mist. Realistic cinematic style, atmospheric depth.", "Next-Scene", ], ["examples/8.jpg", "Make the subjects skin details more prominent and natural.", "Edit-Skin"], ["examples/6.jpg", "Switch the camera to a bottom-up view.", "Multiple-Angles"], ["examples/6.jpg", "Rotate the camera 180 degrees upside down.", "Multiple-Angles"], ["examples/4.jpg", "Rotate the camera 45 degrees to the right.", "Multiple-Angles"], ["examples/4.jpg", "Switch the camera to a top-down view.", "Multiple-Angles"], ["examples/4.jpg", "Switch the camera to a wide-angle lens.", "Multiple-Angles"], ["examples/11.jpg", "Upscale this picture to 4K resolution.", "Upscale2K"], ], inputs=[input_image_1, prompt, lora_adapter], outputs=[output_image, seed, last_output], fn=infer_example, cache_examples=False, label="Examples", ) # ===================== IMAGE TO VIDEO TAB ===================== with gr.TabItem("πŸŽ₯ Image to Video"): with gr.Row(): with gr.Column(): i2v_image_input = gr.Image(label="Input Image", type="pil", height=320) i2v_prompt = gr.Textbox( label="Motion Prompt", placeholder="The camera slowly pushes in, cinematic lighting, smooth motion", lines=3, ) btn_use_last = gr.Button("⬆️ Use Last Output From Image Edit", variant="secondary") with gr.Column(): i2v_output_video = gr.Video(label="Generated Video", height=420) i2v_generate_btn = gr.Button("Generate Video", variant="primary", size="large") with gr.Accordion("Video Settings", open=True): with gr.Row(): i2v_prewarm_btn = gr.Button("Load I2V Model (Prewarm)", variant="secondary") i2v_prewarm_status = gr.Textbox(label="I2V Model Status", interactive=False, value=_i2v_prewarm_status) i2v_run_status = gr.Textbox(label="Last Generation Status", interactive=False, value="Not started") with gr.Row(): i2v_steps = gr.Slider(label="Inference Steps", minimum=4, maximum=30, value=4, step=1) i2v_guidance = gr.Slider(label="Guidance Scale", minimum=1.0, maximum=8.0, value=2.5, step=0.1) with gr.Row(): i2v_frames = gr.Slider(label="Number of Frames", minimum=16, maximum=81, value=16, step=1) i2v_fps = gr.Slider(label="FPS", minimum=8, maximum=24, value=12, step=1) i2v_seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1) i2v_randomize = gr.Checkbox(label="Randomize Seed", value=True) # ===================== Button Connections ===================== run_button.click( fn=infer, inputs=[ input_image_1, input_image_2, input_images_extra, prompt, lora_adapter, seed, randomize_seed, guidance_scale, steps, target_megapixels, extras_condition_only, pad_to_canvas, vae_tiling, resolution_multiple, vae_ref_megapixels, decoder_vae, keep_decoder_2x, ], outputs=[output_image, seed, last_output], ) # Output routing buttons btn_out_to_img1.click(fn=set_output_as_image1, inputs=[last_output], outputs=[input_image_1]) btn_out_to_img2.click(fn=set_output_as_image2, inputs=[last_output], outputs=[input_image_2]) btn_out_to_extra.click(fn=set_output_as_extra, inputs=[last_output, input_images_extra], outputs=[input_images_extra]) # Derived conditioning add_derived_btn.click( fn=add_derived_ref, inputs=[input_image_1, input_images_extra, derived_type, derived_use_gpu], outputs=[input_images_extra, derived_preview], ) # Send last edited image to video tab btn_use_last.click( fn=lambda img: img, inputs=[output_image], outputs=[i2v_image_input], ) # Generate video i2v_generate_btn.click( fn=generate_i2v, inputs=[i2v_image_input, i2v_prompt, i2v_steps, i2v_guidance, i2v_frames, i2v_fps, i2v_seed, i2v_randomize], outputs=[i2v_output_video, i2v_seed, i2v_run_status], ) # Prewarm video model i2v_prewarm_btn.click( fn=prewarm_i2v_model, inputs=[], outputs=[i2v_prewarm_status], ) demo.load( fn=get_i2v_prewarm_status, inputs=[], outputs=[i2v_prewarm_status], ) if __name__ == "__main__": _auto_prewarm_i2v_if_enabled() demo.queue(max_size=20).launch(css=css, theme=orange_red_theme, show_error=True)