Сделал тут по другому нейронку из туториала по торчу, решил выложить сюда готовый код:

!unzip /content/PennFudanPed.zip

import os

import numpy as np

import torch

from PIL import Image

class PennFudanDataset(torch.utils.data.Dataset):

    def __init__(self, root, transforms):

        self.root = root

        self.transforms = transforms

        # load all image files, sorting them to

        # ensure that they are aligned

        self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))

        self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))

    def __getitem__(self, idx):

        # load images and masks

        img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])

        mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])

        img = Image.open(img_path).convert("RGB")

        # note that we haven't converted the mask to RGB,

        # because each color corresponds to a different instance

        # with 0 being background

        mask = Image.open(mask_path)

        # convert the PIL Image into a numpy array

        mask = np.array(mask)

        # instances are encoded as different colors

        obj_ids = np.unique(mask)

        # first id is the background, so remove it

        obj_ids = obj_ids[1:]

        # split the color-encoded mask into a set

        # of binary masks

        masks = mask == obj_ids[:, None, None]

        # get bounding box coordinates for each mask

        num_objs = len(obj_ids)

        boxes = []

        for i in range(num_objs):

            pos = np.where(masks[i])

            xmin = np.min(pos[1])

            xmax = np.max(pos[1])

            ymin = np.min(pos[0])

            ymax = np.max(pos[0])

            boxes.append([xmin, ymin, xmax, ymax])

        # convert everything into a torch.Tensor

        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # there is only one class

        labels = torch.ones((num_objs,), dtype=torch.int64)

        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])

        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        # suppose all instances are not crowd

        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}

        target["boxes"] = boxes

        target["labels"] = labels

        target["masks"] = masks

        target["image_id"] = image_id

        target["area"] = area

        target["iscrowd"] = iscrowd

        if self.transforms is not None:

            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):

        return len(self.imgs)

import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has

# num_classes which is user-defined

num_classes = 2  # 1 class (person) + background

# get number of input features for the classifier

in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one

model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

import torchvision

from torchvision.models.detection import FasterRCNN

from torchvision.models.detection.rpn import AnchorGenerator

# load a pre-trained model for classification and return

# only the features

backbone = torchvision.models.mobilenet_v2(weights="DEFAULT").features

# FasterRCNN needs to know the number of

# output channels in a backbone. For mobilenet_v2, it's 1280

# so we need to add it here

backbone.out_channels = 1280

# let's make the RPN generate 5 x 3 anchors per spatial

# location, with 5 different sizes and 3 different aspect

# ratios. We have a Tuple[Tuple[int]] because each feature

# map could potentially have different sizes and

# aspect ratios

anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),),

                                   aspect_ratios=((0.5, 1.0, 2.0),))

# let's define what are the feature maps that we will

# use to perform the region of interest cropping, as well as

# the size of the crop after rescaling.

# if your backbone returns a Tensor, featmap_names is expected to

# be [0]. More generally, the backbone should return an

# OrderedDict[Tensor], and in featmap_names you can choose which

# feature maps to use.

roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=['0'],

                                                output_size=7,

                                                sampling_ratio=2)

# put the pieces together inside a FasterRCNN model

model = FasterRCNN(backbone,

                   num_classes=2,

                   rpn_anchor_generator=anchor_generator,

                   box_roi_pool=roi_pooler)

import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

def get_model_instance_segmentation(num_classes):

    # load an instance segmentation model pre-trained on COCO

    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")

    # get number of input features for the classifier

    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # replace the pre-trained head with a new one

    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # now get the number of input features for the mask classifier

    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels

    hidden_layer = 256

    # and replace the mask predictor with a new one

    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,

                                                       hidden_layer,

                                                       num_classes)

    return model

from torch import Tensor, nn

from torchvision import ops

from torchvision.transforms import functional as F, transforms as T

class Compose():

  def __init__(self, transforms):

    self.transforms = transforms

  def __call__(self, image, target):

    for t in self.transforms:

      image, target = t(image, target)

    return image, target

from typing import *

class PILToTensor(nn.Module):

  def forward(self, image, target = None):

    image = F.pil_to_tensor(image)

    return image, target

class ConvertImageDtype(nn.Module):

  def __init__(self, dtype):

    super().__init__()

    self.dtype = dtype

  def forward(self, image, target):

    image = F.convert_image_dtype(image,dtype = self.dtype)

    return image, target

class RandomHorizontalFlip(T.RandomHorizontalFlip):

  def forward(self, image, target = None):

    if torch.rand(1) < self.p:

      image = F.hflip(image)

      if target is not None:

        _, _, width = F.get_dimensions(image)

        target['boxes'][:, [2, 0]] = width - target['boxes'][:, [0, 2]]

        if 'mask' in target:

          target['masks'] = target['masks'].flip(-1)

    return image, target

def get_transforms(train: bool = False):

  transforms = [

      PILToTensor(),

      ConvertImageDtype(torch.float)

  ]

  if train:

    transforms.append(RandomHorizontalFlip(0.5))

  return Compose(transforms)

def collate_fn(batch):

  return tuple(zip(*batch))

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

dataset = PennFudanDataset('PennFudanPed', get_transforms(train=True))

data_loader = torch.utils.data.DataLoader(

 dataset, batch_size=2, shuffle=True, num_workers=2,

 collate_fn=collate_fn)

# For Training

images, targets = next(iter(data_loader))

images = list(image for image in images)

targets = [{k: v for k, v in t.items()} for t in targets]

output = model(images,targets)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

num_classes = 2

dataset = PennFudanDataset('PennFudanPed', get_transforms(train=True))

dataset_test = PennFudanDataset('PennFudanPed', get_transforms(train=False))

indices = torch.randperm(len(dataset)).tolist()

dataset = torch.utils.data.Subset(dataset, indices[:-50])

dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

    # define training and validation data loaders

data_loader = torch.utils.data.DataLoader(

    dataset, batch_size=2, shuffle=True, num_workers=2,

    collate_fn=collate_fn)

data_loader_test = torch.utils.data.DataLoader(

    dataset_test, batch_size=1, shuffle=False, num_workers=2,

    collate_fn=collate_fn)

# get the model using our helper function

model = get_model_instance_segmentation(num_classes)

# move model to the right device

model.to(device)

# construct an optimizer

params = [p for p in model.parameters() if p.requires_grad]

optimizer = torch.optim.SGD(params, lr=0.005,

                                momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,

                                                step_size=3,

                                                gamma=0.1)

from tqdm import tqdm

def train_one_epoch(model, dataloader, optimizer, device):

  model.train()

  epoch_loss = 0.0

  for images, targets in tqdm(dataloader):

    images = [image.to(device) for image in images]

    targets = [{k: v.to(device) for k, v in target.items()} for target in targets]

    loss_dict = model(images, targets)

    loss = sum(l for l in loss_dict.values())

    optimizer.zero_grad()

    loss.backward()

    optimizer.step()

    epoch_loss += loss.item()

  return epoch_loss / len(dataloader)

@torch.no_grad()

def evaluate(model, dataloader, device):

  #model.eval()

  epoch_loss = 0.0

  for images, targets in tqdm(dataloader):

    images = [image.to(device) for image in images]

    targets = [{k: v.to(device) for k, v in target.items()} for target in targets]

    loss_dict = model(images, targets)

    loss = sum(l for l in loss_dict.values())

    epoch_loss += loss.item()

  return epoch_loss / len(dataloader)

epochs = 10

for epoch in range(1, epochs + 1):

  train_loss = train_one_epoch(model, data_loader, optimizer, device)

  val_loss = evaluate(model, data_loader_test, device)

  lr_scheduler.step()

  print(f'Epoch {epoch}\nTrain Loss = {train_loss:.2f}\nValidation Loss = {val_loss:.2f}')

img, _ = dataset_test[0]

model.eval()

with torch.no_grad():

  prediction = model([img.to(device)])

from PIL import ImageDraw

img_copy = Image.fromarray(prediction[0]['masks'][0, 0].mul(255).byte().cpu().numpy())

state_dict = prediction[0]

draw = ImageDraw.Draw(img_copy)

for i in range(len(state_dict['boxes'])):

  draw.rectangle(state_dict['boxes'][i].cpu().numpy(), outline = "#FF0000")

img_copy

with open('segmentation_model.pt', 'wb') as f:

  torch.save(model, f)

device = torch.device('cuda:0')

model = get_model_instance_segmentation(2)

model.to(device)

model.load_state_dict(torch.load('./segmentation_model (1).pt'))

model.eval()

from PIL import ImageDraw

import cv2

def convert_to_pil(img: np.ndarray) -> Image:

    return Image.fromarray(img)

def predict(img: Image):

    img = F.pil_to_tensor(img)

    img = F.convert_image_dtype(img, dtype=torch.float)

    img = img.to(device)

    with torch.no_grad():

        prediction = model([img])

    return prediction

def draw_boxes(img: Image, prediction):

    state_dict = prediction[0]

    draw = ImageDraw.Draw(img)

    for i in range(len(state_dict['labels'])):

        draw.rectangle(state_dict['boxes'][i].cpu().numpy(), outline="#FF0000")

    return np.array(img)

from tqdm import tqdm

import math

def process_video(model, video_path, frame_rate = 30, max_frame = None):

  capture = cv2.VideoCapture(video_path)

  fps = capture.get(cv2.CAP_PROP_FPS)

  print(f'Frame Rate: {fps}')

  lenght = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))

  multipier = int(math.ceil(fps / frame_rate))  if fps > frame_rate else 1

  ret = True

  box_frames = []

  progress = iter(tqdm(range(lenght)))

  while ret:

    ret, frame = capture.read()

    if (n:= next(progress)) % multipier != 0:

      if max_frame is not None and n >= max_frame:

        break

      continue

    img = convert_to_pil(frame)

    prediction = predict(img)

    box_frames.append(draw_boxes(img, prediction))

  return box_frames

frames = process_video(model, './videoplayback (1).mp4', 30, 1000)

fourcc = cv2.VideoWriter_fourcc(*'XVID')

writer = cv2.VideoWriter('./out.avi', fourcc, 30.0, (640, 360))

for frame in frames:

  writer.write(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

writer.release()

len(frames)

11
9 комментариев

GitHub.com - говорят норм хранилище для срани

1

Там сказали, что такого говна им не надо, вам с ним на дтф

Это уровнем выше. Начинающий программист обязан срать в свежее на жтв

Ну или гист на крайний случай

1

prediction

Image.fromarray(img.mul(255).permute(1, 2, 0).byte().numpy())

Image.fromarray(prediction[0]['masks'][0, 0].mul(255).byte().cpu().numpy())