# -*- coding: utf-8 -*- """electrical_substation_detection.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1Xy8GKRUCgGimAybXZlr9_FRHaZdTx3SW """ !pip install -U git+https://github.com/albu/albumentations --no-cache-dir !pip install segmentation_models import os import sys import random import math import numpy as np import skimage.io import matplotlib import matplotlib.pyplot as plt import segmentation_models as sm sm.set_framework('tf.keras') sm.framework() import albumentations as A import cv2 import tensorflow as tf import tensorflow.keras as keras tf.config.run_functions_eagerly(True) from google.colab import drive drive.mount('/content/drive') x_train_dir = '/content/drive/MyDrive/substation_satellite_data/train/image_chips' y_train_dir = '/content/drive/MyDrive/substation_satellite_data/train/labels' x_valid_dir = '/content/drive/MyDrive/substation_satellite_data/validation/image_chips' y_valid_dir = '/content/drive/MyDrive/substation_satellite_data/validation/labels' def visualize(**images): """PLot images in one row.""" n = len(images) plt.figure(figsize=(16, 5)) for i, (name, image) in enumerate(images.items()): plt.subplot(1, n, i + 1) plt.xticks([]) plt.yticks([]) plt.title(' '.join(name.split('_')).title()) plt.imshow(image) plt.show() def denormalize(x): """Scale image to range 0..1 for correct plot""" x_max = np.percentile(x, 98) x_min = np.percentile(x, 2) x = (x - x_min) / (x_max - x_min) x = x.clip(0, 1) return x class Dataset: CLASSES = ['nodetect', 'es'] def __init__(self, images_dir, masks_dir, classes=None, augmentation=None, preprocessing=None): self.ids = os.listdir(images_dir) self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids] self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids] self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes] self.augmentation = augmentation self.preprocessing = preprocessing def __getitem__(self, i): image = cv2.imread(self.images_fps[i]) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) mask = cv2.imread(self.masks_fps[i], 0) mask = mask/mask.max() masks = [(mask == v) for v in self.class_values] mask = np.stack(masks, axis=-1).astype('float') if mask.shape[-1] != 1: background = 1 - mask.sum(axis=-1, keepdims=True) mask = np.concatenate((mask, background), axis=-1) if self.augmentation: sample = self.augmentation(image=image, mask=mask) image, mask = sample['image'], sample['mask'] if self.preprocessing: sample = self.preprocessing(image=image, mask=mask) image, mask = sample['image'], sample['mask'] return image, mask def __len__(self): return len(self.ids) class Dataloder(keras.utils.Sequence): """Load data from dataset and form batches Args: dataset: instance of Dataset class for image loading and preprocessing. batch_size: Integet number of images in batch. shuffle: Boolean, if `True` shuffle image indexes each epoch. """ def __init__(self, dataset, batch_size=1, shuffle=False): self.dataset = dataset self.batch_size = batch_size self.shuffle = shuffle self.indexes = np.arange(len(dataset)) self.on_epoch_end() def __getitem__(self, i): start = i * self.batch_size stop = (i + 1) * self.batch_size data = [] for j in range(start, stop): data.append(self.dataset[j]) batch = [np.stack(samples, axis=0) for samples in zip(*data)] return batch[0], batch[1] def __len__(self): return len(self.indexes) // self.batch_size def on_epoch_end(self): if self.shuffle: self.indexes = np.random.permutation(self.indexes) dataset = Dataset(x_train_dir, y_train_dir, classes=['NoDetect', 'ES']) image, mask = dataset[10] visualize(image=image, substation_mask=mask[..., 1].squeeze()) IMG_SIZE = 512 def round_clip_0_1(x, **kwargs): return x.round().clip(0, 1) def normalize_albumenation(x, **kwargs): return x def get_training_augmentation(): train_transform = [ A.HorizontalFlip(p=0.5), A.ShiftScaleRotate(scale_limit=0.1, rotate_limit=30, shift_limit=0.1, p=1, border_mode=0), A.PadIfNeeded(min_height=IMG_SIZE, min_width=IMG_SIZE, always_apply=True, border_mode=0), A.RandomCrop(height=IMG_SIZE, width=IMG_SIZE, always_apply=True), A.Lambda(mask=round_clip_0_1) ] return A.Compose(train_transform) def get_validation_augmentation(): test_transform = [ A.RandomCrop(height=IMG_SIZE, width=IMG_SIZE, always_apply=True) ] return A.Compose(test_transform) def get_preprocessing(preprocessing_fn): _transform = [ A.Lambda(image=preprocessing_fn), A.Lambda(image=normalize_albumenation) ] return A.Compose(_transform) dataset = Dataset(x_train_dir, y_train_dir, classes=['nodetect', 'es'], augmentation=get_training_augmentation()) image, mask = dataset[10] visualize( image=image, substation_mask=mask[..., 1].squeeze(), ) BASE = 'resnet34' BATCH_SIZE = 8 CLASSES = ['es'] LR = 0.0001 EPOCHS = 150 version = 8 preprocess_input = sm.get_preprocessing(BASE) n_classes = 1 if len(CLASSES) == 1 else (len(CLASSES) + 1) activation = 'sigmoid' if n_classes == 1 else 'softmax' tf.keras.backend.clear_session() model = sm.Unet(BASE, classes=n_classes, activation=activation, encoder_weights='imagenet') optimizer = keras.optimizers.Adam(LR) dice_loss = sm.losses.DiceLoss() focal_loss = sm.losses.BinaryFocalLoss() if n_classes == 1 else sm.losses.CategoricalFocalLoss() jacard_loss = sm.losses.JaccardLoss() total_loss = dice_loss+jacard_loss+focal_loss metrics = [sm.metrics.IOUScore(threshold=0.5), sm.metrics.FScore(threshold=0.5)] model.compile(optimizer, total_loss, metrics) train_dataset = Dataset( x_train_dir, y_train_dir, classes=CLASSES, augmentation=get_training_augmentation(), preprocessing=get_preprocessing(preprocess_input), ) valid_dataset = Dataset( x_valid_dir, y_valid_dir, classes=CLASSES, augmentation=get_validation_augmentation(), preprocessing=get_preprocessing(preprocess_input), ) train_dataloader = Dataloder(train_dataset, batch_size=BATCH_SIZE, shuffle=True) valid_dataloader = Dataloder(valid_dataset, batch_size=1, shuffle=False) assert train_dataloader[0][0].shape == (BATCH_SIZE, IMG_SIZE, IMG_SIZE, 3) assert train_dataloader[0][1].shape == (BATCH_SIZE, IMG_SIZE, IMG_SIZE, n_classes) callbacks = [ keras.callbacks.ReduceLROnPlateau(factor=0.5, verbose=1), ] print(model.summary()) history = model.fit_generator( train_dataloader, steps_per_epoch=len(train_dataloader), epochs=EPOCHS, callbacks=callbacks, validation_data=valid_dataloader, validation_steps=len(valid_dataloader), ) plt.figure(figsize=(30, 10)) plt.subplot(121) plt.plot(history.history['iou_score']) plt.plot(history.history['val_iou_score']) plt.title('Model iou_score') plt.ylabel('iou_score') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper left') plt.subplot(122) plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title('Model loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper left') plt.show() test_img = imread('/content/drive/MyDrive/substation_satellite_data/test/mosaic_test.jpg') i=1 j=3 image = np.zeros((768,768,3)) image[:750, :750] = test_img[i*750:(i+1)*750, j*750:(j+1)*750] image = get_preprocessing(preprocessing_fn=preprocess_input)(image=image)['image'] image = np.expand_dims(image, axis=0) out_mask= model.predict(image)[0,:750,:750,0] visualize(Input=test_img[i*750:(i+1)*750, j*750:(j+1)*750], Model_Results=out_mask, Predicted=out_mask.round())