国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

濟源城鄉(xiāng)建設局網(wǎng)站營銷推廣技巧

濟源城鄉(xiāng)建設局網(wǎng)站,營銷推廣技巧,建設通官網(wǎng)入口,無錫網(wǎng)站建設電話背景今天的話題會偏代碼技巧一些,對于以前沒有接觸過代碼的朋友或者接觸代碼開發(fā)經(jīng)驗較少的朋友會有些吃力。上篇文章介紹了如何廣視角的生成相對穩(wěn)定的視頻。昨天的實現(xiàn)相對簡單,主要用的是UI界面來做生成。但是生成的效果其實也顯而易見,不…

背景

今天的話題會偏代碼技巧一些,對于以前沒有接觸過代碼的朋友或者接觸代碼開發(fā)經(jīng)驗較少的朋友會有些吃力。

上篇文章介紹了如何廣視角的生成相對穩(wěn)定的視頻。昨天的實現(xiàn)相對簡單,主要用的是UI界面來做生成。但是生成的效果其實也顯而易見,不算太好。人物連貫性和動作連貫性是不夠的。原因如下:

1.stablediffusion webui的batch 的image2image還無法真正做到image2image

2.控制其實是通過固定的文本prompt+多個controlnet來控制

然后如果希望畫面能夠穩(wěn)定,其實image的控制信息和每張圖用相對有差異的prompt生成的圖片質(zhì)量連貫性和穩(wěn)定性會更好(image2image控制整體的風格和內(nèi)容,controlnet控制細節(jié),prompt可以控制一些內(nèi)容差異)。

這篇文章不會跟大家分享,穩(wěn)定圖生成的具體細節(jié)。而是跟大家分享更重要的,如何用代碼來實現(xiàn)webui的功能,如何用代碼方式搭建更可控更高效的圖生成鏈路。

內(nèi)容

1.用代碼搭建stablediffusion+controlnet生產(chǎn)流程

2.multi-control net生產(chǎn)流程搭建

3.diffuser沒有的功能如何自己實現(xiàn)加入

用代碼搭建stablediffusion+controlnet生產(chǎn)流程

安裝diffuser包

pip install --upgrade diffusers accelerate transformers
#如果要安裝最新diffuser可以執(zhí)行下面指令
pip install git+https://github.com/huggingface/diffusers
! pip install controlnet_hinter==0.0.5

搭建stablediffusion+controlnet腳本

from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import numpy as np
from PIL import Image
import cv2#加載測試圖片
original_image = load_image("https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_imgvar/input_image_vermeer.png")
original_image#設置controlnet+stablefiddufion流水線
controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#抽取圖片canny邊界
canny_edged_image = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png")
canny_edged_image#canny controlnet做圖片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=canny_edged_image,num_inference_steps=30, generator=generator).images[0]
image#設置canny編輯過濾閥值
control_image = np.array(original_image)
low_threshold = 10 # default=100
high_threshold = 150 # default=200
control_image = cv2.Canny(control_image, low_threshold, high_threshold)
control_image = control_image[:, :, None]
control_image = np.concatenate([control_image, control_image, control_image], axis=2)
control_image = Image.fromarray(control_image)
control_image#canny controlnet做圖片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image,num_inference_steps=30, generator=generator).images[0]
image#pose的controlnet流水線
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir= './')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#pose的圖片
pose_image = load_image('https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose.png')
pose_image#pose流水線生產(chǎn)圖
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed, football, a boy", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=pose_image, generator=generator,num_inference_steps=30).images[0]
image#用controlnet_hinter抽取pose控制特征
control_image = controlnet_hinter.hint_openpose(original_image)
control_image#pose流水線生產(chǎn)圖
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#深度圖
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-depth")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_depth(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#輪廓圖
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-scribble")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()scribble_image = load_image('https://github.com/lllyasviel/ControlNet/raw/main/test_imgs/user_1.png')
scribble_imagegenerator = torch.Generator(device="cpu").manual_seed(1)
image = pipe(prompt="a turtle, best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=scribble_image, generator=generator,num_inference_steps=30).images[0]
image#Segmentation
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-seg")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()
control_image = controlnet_hinter.hint_segmentation(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#Hough 建筑、大場景里用的多
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-mlsd")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_hough(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(2)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image
multi-control net生產(chǎn)流程搭建

diffuser包里面現(xiàn)在還沒實現(xiàn)多control net控制圖生成,需要使用multi-controlnet可以用以下代碼。

# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tupleimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom diffusers import AutoencoderKL, ControlNetModel, UNet2DConditionModel
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (PIL_INTERPOLATION,is_accelerate_available,is_accelerate_version,logging,randn_tensor,replace_example_docstring,
)
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker
from diffusers.models.controlnet import ControlNetOutputlogger = logging.get_logger(__name__)  # pylint: disable=invalid-nameclass ControlNetProcessor(object):def __init__(self,controlnet: ControlNetModel,image: Union[torch.FloatTensor, PIL.Image.Image, List[torch.FloatTensor], List[PIL.Image.Image]],conditioning_scale: float = 1.0,):self.controlnet = controlnetself.image = imageself.conditioning_scale = conditioning_scaledef _default_height_width(self, height, width, image):if isinstance(image, list):image = image[0]if height is None:if isinstance(image, PIL.Image.Image):height = image.heightelif isinstance(image, torch.Tensor):height = image.shape[3]height = (height // 8) * 8  # round down to nearest multiple of 8if width is None:if isinstance(image, PIL.Image.Image):width = image.widthelif isinstance(image, torch.Tensor):width = image.shape[2]width = (width // 8) * 8  # round down to nearest multiple of 8return height, widthdef default_height_width(self, height, width):return self._default_height_width(height, width, self.image)def _prepare_image(self, image, width, height, batch_size, num_images_per_prompt, device, dtype):if not isinstance(image, torch.Tensor):if isinstance(image, PIL.Image.Image):image = [image]if isinstance(image[0], PIL.Image.Image):image = [np.array(i.resize((width, height), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]image = np.concatenate(image, axis=0)image = np.array(image).astype(np.float32) / 255.0image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image)elif isinstance(image[0], torch.Tensor):image = torch.cat(image, dim=0)image_batch_size = image.shape[0]if image_batch_size == 1:repeat_by = batch_sizeelse:# image batch size is the same as prompt batch sizerepeat_by = num_images_per_promptimage = image.repeat_interleave(repeat_by, dim=0)image = image.to(device=device, dtype=dtype)return imagedef _check_inputs(self, image, prompt, prompt_embeds):image_is_pil = isinstance(image, PIL.Image.Image)image_is_tensor = isinstance(image, torch.Tensor)image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)if not image_is_pil and not image_is_tensor and not image_is_pil_list and not image_is_tensor_list:raise TypeError("image must be passed and be one of PIL image, torch tensor, list of PIL images, or list of torch tensors")if image_is_pil:image_batch_size = 1elif image_is_tensor:image_batch_size = image.shape[0]elif image_is_pil_list:image_batch_size = len(image)elif image_is_tensor_list:image_batch_size = len(image)if prompt is not None and isinstance(prompt, str):prompt_batch_size = 1elif prompt is not None and isinstance(prompt, list):prompt_batch_size = len(prompt)elif prompt_embeds is not None:prompt_batch_size = prompt_embeds.shape[0]if image_batch_size != 1 and image_batch_size != prompt_batch_size:raise ValueError(f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}")def check_inputs(self, prompt, prompt_embeds):self._check_inputs(self.image, prompt, prompt_embeds)def prepare_image(self, width, height, batch_size, num_images_per_prompt, device, do_classifier_free_guidance):self.image = self._prepare_image(self.image, width, height, batch_size, num_images_per_prompt, device, self.controlnet.dtype)if do_classifier_free_guidance:self.image = torch.cat([self.image] * 2)def __call__(self,sample: torch.FloatTensor,timestep: Union[torch.Tensor, float, int],encoder_hidden_states: torch.Tensor,class_labels: Optional[torch.Tensor] = None,timestep_cond: Optional[torch.Tensor] = None,attention_mask: Optional[torch.Tensor] = None,cross_attention_kwargs: Optional[Dict[str, Any]] = None,return_dict: bool = True,) -> Tuple:down_block_res_samples, mid_block_res_sample = self.controlnet(sample=sample,controlnet_cond=self.image,timestep=timestep,encoder_hidden_states=encoder_hidden_states,class_labels=class_labels,timestep_cond=timestep_cond,attention_mask=attention_mask,cross_attention_kwargs=cross_attention_kwargs,return_dict=False,)down_block_res_samples = [down_block_res_sample * self.conditioning_scale for down_block_res_sample in down_block_res_samples]mid_block_res_sample *= self.conditioning_scalereturn (down_block_res_samples, mid_block_res_sample)EXAMPLE_DOC_STRING = """Examples:```py>>> # !pip install opencv-python transformers accelerate>>> from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler>>> from diffusers.utils import load_image>>> import numpy as np>>> import torch>>> import cv2>>> from PIL import Image>>> # download an image>>> image = load_image(...     "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png"... )>>> image = np.array(image)>>> # get canny image>>> image = cv2.Canny(image, 100, 200)>>> image = image[:, :, None]>>> image = np.concatenate([image, image, image], axis=2)>>> canny_image = Image.fromarray(image)>>> # load control net and stable diffusion v1-5>>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16)>>> pipe = StableDiffusionControlNetPipeline.from_pretrained(...     "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16... )>>> # speed up diffusion process with faster scheduler and memory optimization>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)>>> # remove following line if xformers is not installed>>> pipe.enable_xformers_memory_efficient_attention()>>> pipe.enable_model_cpu_offload()>>> # generate image>>> generator = torch.manual_seed(0)>>> image = pipe(...     "futuristic-looking woman", num_inference_steps=20, generator=generator, image=canny_image... ).images[0]```
"""class StableDiffusionMultiControlNetPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""_optional_components = ["safety_checker", "feature_extractor"]def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()if safety_checker is None and requires_safety_checker:logger.warning(f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"" results in services or applications open to the public. Both the diffusers team and Hugging Face"" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"" it only for use-cases that involve analyzing network behavior or auditing its results. For more"" information, please have a look at https://github.com/huggingface/diffusers/pull/254 .")if safety_checker is not None and feature_extractor is None:raise ValueError("Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead.")self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_vae_slicingdef enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_vae_slicingdef disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae, controlnet, and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.Note that offloading happens on a submodule basis. Memory savings are higher than with`enable_model_cpu_offload`, but performance is lower."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)def enable_model_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Comparedto `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with`enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`."""if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"):from accelerate import cpu_offload_with_hookelse:raise ImportError("`enable_model_offload` requires `accelerate v0.17.0` or higher.")device = torch.device(f"cuda:{gpu_id}")hook = Nonefor cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]:_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook)if self.safety_checker is not None:# the safety checker can offload the vae again_, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook)# control net hook has be manually offloaded as it alternates with unet# cpu_offload_with_hook(self.controlnet, device)# We'll offload the last model manually.self.final_offload_hook = hook@property# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_devicedef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.device# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_promptdef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embeds# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checkerdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_concept# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latentsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return image# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargsdef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latentsdef prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,processors: List[ControlNetProcessor],prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: int = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight, width = processors[0].default_height_width(height, width)# 1. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)for processor in processors:processor.check_inputs(prompt, prompt_embeds)# 2. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 3. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)# 4. Prepare imagefor processor in processors:processor.prepare_image(width=width,height=height,batch_size=batch_size * num_images_per_prompt,num_images_per_prompt=num_images_per_prompt,device=device,do_classifier_free_guidance=do_classifier_free_guidance,)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)# controlnet inferencefor i, processor in enumerate(processors):down_samples, mid_sample = processor(latent_model_input,t,encoder_hidden_states=prompt_embeds,return_dict=False,)if i == 0:down_block_res_samples, mid_block_res_sample = down_samples, mid_sampleelse:down_block_res_samples = [d_prev + d_curr for d_prev, d_curr in zip(down_block_res_samples, down_samples)]mid_block_res_sample = mid_block_res_sample + mid_sample# predict the noise residualnoise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,down_block_additional_residuals=down_block_res_samples,mid_block_additional_residual=mid_block_res_sample,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)# If we do sequential model offloading, let's offload unet and controlnet# manually for max memory savingsif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.unet.to("cpu")torch.cuda.empty_cache()if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# Offload last model to CPUif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.final_offload_hook.offload()if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)# demo & simple test
def main():from diffusers.utils import load_imagepipe = StableDiffusionMultiControlNetPipeline.from_pretrained("./model", safety_checker=None, torch_dtype=torch.float16).to("cuda")pipe.enable_xformers_memory_efficient_attention()controlnet_canny = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")controlnet_pose = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")canny_left = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_left.png")canny_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_right.png")pose_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left),ControlNetProcessor(controlnet_canny, canny_right),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left,0.5),ControlNetProcessor(controlnet_pose, pose_right,1.6),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_pose_right.png")if __name__ == "__main__":main()
diffuser沒有的功能如何自己實現(xiàn)加入

假設我們要自己實現(xiàn)一個stablediffusion+controlnet+inpaint的功能,該如何實現(xiàn)。這個任務大部分是在生產(chǎn)流程上做串接,所以代碼基本可以定位在pipline模塊stablediffusion。

假設我們代碼實現(xiàn)如下(代碼在特定版本有效,這個后面會升級,大家可以不急著用。只是跟大家講解diffuser的代碼框架,以及改動一個模塊該如何融入diffuser中供自己使用)

#代碼文件名:pipeline_stable_diffusion_controlnet_inpaint.py
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Unionimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor, replace_example_docstring
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyCheckerlogger = logging.get_logger(__name__)  # pylint: disable=invalid-nameEXAMPLE_DOC_STRING = """Examples:```py>>> from diffusers import StableDiffusionControlNetPipeline>>> from diffusers.utils import load_image>>> # Canny edged image for control>>> canny_edged_image = load_image(...     "https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png"... )>>> pipe = StableDiffusionControlNetPipeline.from_pretrained("takuma104/control_sd15_canny").to("cuda")>>> image = pipe(prompt="best quality, extremely detailed", controlnet_hint=canny_edged_image).images[0]```
"""def prepare_mask_and_masked_image(image, mask):"""Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will beconverted to ``torch.Tensor`` with shapes ``batch x channels x height x width`` where ``channels`` is ``3`` for the``image`` and ``1`` for the ``mask``.The ``image`` will be converted to ``torch.float32`` and normalized to be in ``[-1, 1]``. The ``mask`` will bebinarized (``mask > 0.5``) and cast to ``torch.float32`` too.Args:image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.It can be a ``PIL.Image``, or a ``height x width x 3`` ``np.array`` or a ``channels x height x width````torch.Tensor`` or a ``batch x channels x height x width`` ``torch.Tensor``.mask (_type_): The mask to apply to the image, i.e. regions to inpaint.It can be a ``PIL.Image``, or a ``height x width`` ``np.array`` or a ``1 x height x width````torch.Tensor`` or a ``batch x 1 x height x width`` ``torch.Tensor``.Raises:ValueError: ``torch.Tensor`` images should be in the ``[-1, 1]`` range. ValueError: ``torch.Tensor`` maskshould be in the ``[0, 1]`` range. ValueError: ``mask`` and ``image`` should have the same spatial dimensions.TypeError: ``mask`` is a ``torch.Tensor`` but ``image`` is not(ot the other way around).Returns:tuple[torch.Tensor]: The pair (mask, masked_image) as ``torch.Tensor`` with 4dimensions: ``batch x channels x height x width``."""if isinstance(image, torch.Tensor):if not isinstance(mask, torch.Tensor):raise TypeError(f"`image` is a torch.Tensor but `mask` (type: {type(mask)} is not")# Batch single imageif image.ndim == 3:assert image.shape[0] == 3, "Image outside a batch should be of shape (3, H, W)"image = image.unsqueeze(0)# Batch and add channel dim for single maskif mask.ndim == 2:mask = mask.unsqueeze(0).unsqueeze(0)# Batch single mask or add channel dimif mask.ndim == 3:# Single batched mask, no channel dim or single mask not batched but channel dimif mask.shape[0] == 1:mask = mask.unsqueeze(0)# Batched masks no channel dimelse:mask = mask.unsqueeze(1)assert image.ndim == 4 and mask.ndim == 4, "Image and Mask must have 4 dimensions"assert image.shape[-2:] == mask.shape[-2:], "Image and Mask must have the same spatial dimensions"assert image.shape[0] == mask.shape[0], "Image and Mask must have the same batch size"# Check image is in [-1, 1]if image.min() < -1 or image.max() > 1:raise ValueError("Image should be in [-1, 1] range")# Check mask is in [0, 1]if mask.min() < 0 or mask.max() > 1:raise ValueError("Mask should be in [0, 1] range")# Binarize maskmask[mask < 0.5] = 0mask[mask >= 0.5] = 1# Image as float32image = image.to(dtype=torch.float32)elif isinstance(mask, torch.Tensor):raise TypeError(f"`mask` is a torch.Tensor but `image` (type: {type(image)} is not")else:# preprocess imageif isinstance(image, (PIL.Image.Image, np.ndarray)):image = [image]if isinstance(image, list) and isinstance(image[0], PIL.Image.Image):image = [np.array(i.convert("RGB"))[None, :] for i in image]image = np.concatenate(image, axis=0)elif isinstance(image, list) and isinstance(image[0], np.ndarray):image = np.concatenate([i[None, :] for i in image], axis=0)image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0# preprocess maskif isinstance(mask, (PIL.Image.Image, np.ndarray)):mask = [mask]if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)mask = mask.astype(np.float32) / 255.0elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):mask = np.concatenate([m[None, None, :] for m in mask], axis=0)mask[mask < 0.5] = 0mask[mask >= 0.5] = 1mask = torch.from_numpy(mask)masked_image = image * (mask < 0.5)return mask, masked_imageclass StableDiffusionControlNetInpaintPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.controlnet ([`UNet2DConditionModel`]):[ControlNet](https://arxiv.org/abs/2302.05543) architecture to generate guidance.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,controlnet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,controlnet=controlnet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)def enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()def disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)@propertydef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if self.device != torch.device("meta") or not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.devicedef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embedsdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_conceptdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latentsdef controlnet_hint_conversion(self, controlnet_hint, height, width, num_images_per_prompt):channels = 3if isinstance(controlnet_hint, torch.Tensor):# torch.Tensor: acceptble shape are any of chw, bchw(b==1) or bchw(b==num_images_per_prompt)shape_chw = (channels, height, width)shape_bchw = (1, channels, height, width)shape_nchw = (num_images_per_prompt, channels, height, width)if controlnet_hint.shape in [shape_chw, shape_bchw, shape_nchw]:controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)if controlnet_hint.shape != shape_nchw:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)return controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({channels}, {height}, {width}),"+ f" (1, {channels}, {height}, {width}) or ({num_images_per_prompt}, "+ f"{channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, np.ndarray):# np.ndarray: acceptable shape is any of hw, hwc, bhwc(b==1) or bhwc(b==num_images_per_promot)# hwc is opencv compatible image format. Color channel must be BGR Format.if controlnet_hint.shape == (height, width):controlnet_hint = np.repeat(controlnet_hint[:, :, np.newaxis], channels, axis=2)  # hw -> hwc(c==3)shape_hwc = (height, width, channels)shape_bhwc = (1, height, width, channels)shape_nhwc = (num_images_per_prompt, height, width, channels)if controlnet_hint.shape in [shape_hwc, shape_bhwc, shape_nhwc]:controlnet_hint = torch.from_numpy(controlnet_hint.copy())controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)controlnet_hint /= 255.0if controlnet_hint.shape != shape_nhwc:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)controlnet_hint = controlnet_hint.permute(0, 3, 1, 2)  # b h w c -> b c h wreturn controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({width}, {channels}), "+ f"({height}, {width}, {channels}), "+ f"(1, {height}, {width}, {channels}) or "+ f"({num_images_per_prompt}, {channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, PIL.Image.Image):if controlnet_hint.size == (width, height):controlnet_hint = controlnet_hint.convert("RGB")  # make sure 3 channel RGB formatcontrolnet_hint = np.array(controlnet_hint)  # to numpycontrolnet_hint = controlnet_hint[:, :, ::-1]  # RGB -> BGRreturn self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)else:raise ValueError(f"Acceptable image size of `controlnet_hint` is ({width}, {height}) but is {controlnet_hint.size}")else:raise ValueError(f"Acceptable type of `controlnet_hint` are any of torch.Tensor, np.ndarray, PIL.Image.Image but is {type(controlnet_hint)}")def prepare_mask_latents(self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance):# resize the mask to latents shape as we concatenate the mask to the latents# we do that before converting to dtype to avoid breaking in case we're using cpu_offload# and half precisionmask = torch.nn.functional.interpolate(mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor))mask = mask.to(device=device, dtype=dtype)masked_image = masked_image.to(device=device, dtype=dtype)# encode the mask image into latents space so we can concatenate it to the latentsif isinstance(generator, list):masked_image_latents = [self.vae.encode(masked_image[i : i + 1]).latent_dist.sample(generator=generator[i])for i in range(batch_size)]masked_image_latents = torch.cat(masked_image_latents, dim=0)else:masked_image_latents = self.vae.encode(masked_image).latent_dist.sample(generator=generator)masked_image_latents = self.vae.config.scaling_factor * masked_image_latents# duplicate mask and masked_image_latents for each generation per prompt, using mps friendly methodif mask.shape[0] < batch_size:if not batch_size % mask.shape[0] == 0:raise ValueError("The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"" of masks that you pass is divisible by the total requested batch size.")mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)if masked_image_latents.shape[0] < batch_size:if not batch_size % masked_image_latents.shape[0] == 0:raise ValueError("The passed images and the required batch size don't match. Images are supposed to be duplicated"f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."" Make sure the number of images that you pass is divisible by the total requested batch size.")masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)mask = torch.cat([mask] * 2) if do_classifier_free_guidance else maskmasked_image_latents = (torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents)# aligning device to prevent device errors when concating it with the latent model inputmasked_image_latents = masked_image_latents.to(device=device, dtype=dtype)return mask, masked_image_latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: Optional[int] = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,controlnet_hint: Optional[Union[torch.FloatTensor, np.ndarray, PIL.Image.Image]] = None,image: Union[torch.FloatTensor, PIL.Image.Image] = None,mask_image: Union[torch.FloatTensor, PIL.Image.Image] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).controlnet_hint (`torch.FloatTensor`, `np.ndarray` or `PIL.Image.Image`, *optional*):ControlNet input embedding. ControlNet generates guidances using this input embedding. If the type isspecified as `torch.FloatTensor`, it is passed to ControlNet as is. If the type is `np.ndarray`, it isassumed to be an OpenCV compatible image format. PIL.Image.Image` can also be accepted as an image. Thesize of all these types must correspond to the output image size.Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight = height or self.unet.config.sample_size * self.vae_scale_factorwidth = width or self.unet.config.sample_size * self.vae_scale_factor# 1. Control Embedding check & conversionif controlnet_hint is not None:controlnet_hint = self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)# 2. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)# 3. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 4. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)mask, masked_image = prepare_mask_and_masked_image(image, mask_image)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)mask, masked_image_latents = self.prepare_mask_latents(mask,masked_image,batch_size * num_images_per_prompt,height,width,prompt_embeds.dtype,device,generator,do_classifier_free_guidance,)num_channels_mask = mask.shape[1]num_channels_masked_image = masked_image_latents.shape[1]# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)if controlnet_hint is not None:# ControlNet predict the noise residualcontrol = self.controlnet(latent_model_input, t, encoder_hidden_states=prompt_embeds, controlnet_hint=controlnet_hint)control = [item for item in control]latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,control=control,).sampleelse:# predict the noise residuallatent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)

把文件放置在:

以下項目下的每個__init__.py需要把新加的類添加進去

添加信息可以參考文件里面的寫法,一般如下:

http://aloenet.com.cn/news/34407.html

相關文章:

  • 有哪些做外貿(mào)免費的網(wǎng)站深圳網(wǎng)站設計專家樂云seo
  • 哪些網(wǎng)站專門做康復科seo網(wǎng)站優(yōu)化排名
  • 珍島網(wǎng)站模板最近社會熱點新聞事件
  • 重慶知名網(wǎng)站制作公司seo推廣方法
  • 音樂網(wǎng)站的設計與開發(fā)可以免費網(wǎng)絡推廣網(wǎng)站
  • 濟南網(wǎng)站建設優(yōu)化站長申論
  • java配合什么做網(wǎng)站獨立站seo外鏈平臺
  • 國際新聞用什么軟件看看重慶seo
  • 網(wǎng)站建設目標怎么看廣州網(wǎng)站優(yōu)化平臺
  • 網(wǎng)站開發(fā)的時間流程seo修改器
  • 楚雄做網(wǎng)站百度推廣費用報價單
  • 重慶綦江網(wǎng)站制作公司哪家專業(yè)最全的搜索引擎
  • 專業(yè)網(wǎng)站定制公司新開店鋪怎么做推廣
  • wordpress加載慢廣州seo優(yōu)化推廣
  • 做介紹的英文網(wǎng)站網(wǎng)站設計公司上海
  • jsp和php哪個做網(wǎng)站快百度seo是什么意思
  • 做網(wǎng)站需要多少錢西安優(yōu)化大師官網(wǎng)下載
  • 打開ecshop網(wǎng)站提示內(nèi)容溢出網(wǎng)站的推廣平臺有哪些
  • 廣州疫情防控最新消息淘寶標題優(yōu)化網(wǎng)站
  • 廣州app定制公司百度seo收費
  • 只做外貿(mào)的公司網(wǎng)站seo推廣公司價格
  • 公司郵箱價格免費的seo
  • 青島鑫隆建設集團網(wǎng)站網(wǎng)絡推廣山東
  • 網(wǎng)站建設難點網(wǎng)站推廣優(yōu)化設計方案
  • 蘇州園區(qū)做網(wǎng)站公司seoul是啥意思
  • 貴州安順建設主管部門網(wǎng)站百度seo網(wǎng)站優(yōu)化
  • 網(wǎng)站建設 瀏覽器兼容1688的網(wǎng)站特色
  • 營銷型網(wǎng)站建設一般要多少錢產(chǎn)品網(wǎng)絡營銷策劃方案
  • 做網(wǎng)站寫需求千萬不要做手游推廣員
  • 誠信通開了網(wǎng)站誰給做精準的搜索引擎優(yōu)化