備忘録とか日常とか

学んだこととかを書きます。

画像を小領域に分割して学習 theano

Deep learningは十分な量のデータセットがあって初めて効果を発揮することは周知の事実である。対象がなんにせよ、初めからそこまで用意できないのであればわざわざそれを使うことなく、従来のような特徴量抽出でもって機械学習したほうが性能は出るはず。
なのだが今現在研究で使っているデータセットは二千枚程度(3クラス)で、あまり多いとは言えない。特徴量抽出による研究は歴代の先輩方がずっと研究しており、今年になって自分が別のアプローチ(Deep learningを使ってみる)を指示されたわけである。

そんなわけで、いわゆるdata augmentationによってデータ量を水増しすることで実験している。
使っている画像はいわゆる一般物体認識のようなものではなく、ある一定のテクスチャパターンが画像全体に写っているようなものである。顕微鏡下で撮られた細胞培養の写真とか、内視鏡による患部の画像とか、そういうの感じのやつ。そのパターンを3クラスに識別し、画像中にどのクラスのパターンが最も多いか、また画像のどの部分にどのクラスがあるかを可視化することが目的である。


とりあえずtheanoでネットワークに使うlayerを定義する。

import numpy

import theano
import theano.tensor as T


#use cuda-convnet from pylearn2
from pylearn2.sandbox.cuda_convnet.filter_acts import FilterActs
from pylearn2.sandbox.cuda_convnet.pool import MaxPool
from theano.sandbox.cuda.basic_ops import gpu_contiguous


class HiddenLayer(object):
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh, drop_rate=False):
        """
        multi layer perceptron
        rng: numpy.random.RandomState  重み初期化に使用
        input: theano.tensor.dmatrix  (n_examples, n_in)のテンソル
        :param input: a symbolic tensor of shape (n_examples, n_in)
        n_in: int  inputの次元
        no_out: int  hidden_layerのユニット数
        activation: theano.Op or function  活性化関数
        """
        self.input = input
        
        if W is None:
            W_values = numpy.asarray(
                rng.uniform(
                    low=-numpy.sqrt(6. / (n_in + n_out)),
                    high=numpy.sqrt(6. / (n_in + n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *= 4

            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)

        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b
        self.output = (
            lin_output if activation is None
            else activation(lin_output)
        )
        
        if drop_rate:
            print "dropout in hiddenlayer."
            self.output = dropout(numpy.random.RandomState(123), self.output, drop_rate)
            
        # parameters of the model
        self.params = [self.W, self.b]


def dropout(rng, x, p=0.5):
    """
    rng: numpy.random.RandomState  
    x: hidden_layerのoutput
    p: float  drop_out の確率
    """
    if p > 0. and p < 1.:
        seed = rng.randint(2 ** 30)
        srng = theano.tensor.shared_randomstreams.RandomStreams(seed)
        mask = srng.binomial(n=1, p=1.-p, size=x.shape,
                dtype=theano.config.floatX)
        return x * mask
    return x


class LogisticRegression(object):
    def __init__(self, input, n_in, n_out):
        """ Initialize the parameters of the logistic regression
        input: theano.tensor.TensorType  
        n_in: int  入力数,データの次元
        n_out: int  出力数,ラベルの数
        """
        # start-snippet-1
        # initialize with 0 the weights W as a matrix of shape (n_in, n_out)
        self.W = theano.shared(
            value=numpy.zeros(
                (n_in, n_out),
                dtype=theano.config.floatX
            ),
            name='W',
            borrow=True
        )
        # initialize the biases b as a vector of n_out 0s
        self.b = theano.shared(
            value=numpy.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )

        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)

        # parametars of the model
        self.params = [self.W, self.b]

        self.input = input
        

    def negative_log_likelihood(self, y, ):
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
        # end-snippet-2


    def errors(self, y):
        # check if y has same dimension of y_pred
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        # check if y is of the correct datatype
        if y.dtype.startswith('int'):
            # the T.neq operator returns a vector of 0s and 1s, where 1
            # represents a mistake in prediction
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()


class ConvLayer(object):    
    
    def __init__(self, input,filter_shape, image_shape, st=1,
                 padding=0, rng=numpy.random.RandomState(23455), W_init="fan"):
        """define Convolutional layer
        input: 4Dtensor シンボル 形はimage_shapeと同じ
        filter_shape: tuple  (フィルタ数, 特徴マップ数, 高さ, 幅)
        image_shape: tuple 上と同じ
        st: int  フィルタのストライド
        W_init: "fan" or "gaussian" or "uniform"
        """
        
        assert image_shape[1]==filter_shape[1]
        self.input = input
        
        # fan-inとfan-outからフィルタを初期化
        if W_init == "fan":
            fan_in = numpy.prod(filter_shape[1:])
            fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /
                       numpy.prod((3, 3)))
            W_bound = numpy.sqrt(6. / (fan_in + fan_out))
            self.W = theano.shared(
                numpy.asarray(
                    rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
                    dtype=theano.config.floatX
                ),
                borrow=True
            )
            
        elif W_init == "gaussian":
            sigma = 0.01
            tmp = sigma * numpy.random.standard_normal(filter_shape)
            self.W = theano.shared(numpy.asarray(tmp,dtype=theano.config.floatX), borrow=True)
            
        elif W_init == "uniform":
            a = 0.05
            tmp = 2 * a * ( numpy.random.random_sample(filter_shape) - 0.5)
            self.W = theano.shared(numpy.asarray(tmp,dtype=theano.config.floatX), borrow=True)
        
        
        # biasを1Dtensorで初期化
        b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True)
        
        # cuda-convnetによりconvolution----------------------------------------
        input_shuffled = input.dimshuffle(1,2,3,0)  # bc01 -> c01b
        filters_shuffled = self.W.dimshuffle(1,2,3,0)
    
        conv_op = FilterActs(stride=st, pad=padding, partial_sum=1)
            
        contiguous_input = gpu_contiguous(input_shuffled)
        contiguous_filters = gpu_contiguous(filters_shuffled)
        conv_out_shuffled = conv_op(contiguous_input, contiguous_filters)
        
        conv_out = conv_out_shuffled.dimshuffle(3,0,1,2)
        #---------------------------------------------------------------------
        
        # バイアスを足してreluに通す
        self.output = relu(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))

        self.params = [self.W, self.b]
        self.input = input
        
        
class PoolLayer(object):
    
    def __init__(self, input, poolsize, st, shuffling=True):
        """define pooling layer
        input: 4Dtensor  シンボル
        poolsize: int  プーリングのサイズ
        st: int  ストライド
        """
        
        # cuda-convnet よりpooling--------------------------------------------
        input_shuffled = input.dimshuffle(1,2,3,0)
        
        pool_op = MaxPool(ds=poolsize, stride=st)
        pooled_out_shuffled = pool_op(input_shuffled)
        
        pooled_out = pooled_out_shuffled.dimshuffle(3,0,1,2)
        #------------------------------------------------------------------------
        
        self.output = pooled_out
        
        
def relu(x):
    """
    rectified linear unit
    """"
    return T.switch(x<0, 0, x)

ほぼほぼDeepLearningTutorialの内容と同じだが、ちょっとだけ書き換えてる。
ConvolutionとPoolingを別々に定義し、それぞれにpylearn2からcuda-convnetのFilterActs()MaxPool()を使っている。あとdropoutとかreluとか。正規化層も書くべきかと思ったが、データセットがすでに特殊な環境下で撮られた(ある程度正規化された?)ものなので、どれぐらい効果があるのかはわからない...とりあえず後回し。
コーディングへたくそすぎて可読性皆無なのだが、そこはご愛敬ということで・・・
実際使うときはTutorialと同じ感じなので割愛。


次に画像をcropしていく。まず使う画像はクラスごとにディレクトリに入れて、0,1,2,...とリネームして同じ階層に入れておく。
そこで次の関数を使う。

def getFilelist_all(path):
    """
    指定したディレクトリ内の画像を再帰的に読み込んで,pathのlistで返す
    """
    t = []
    for root, dirs, files  in  os.walk(path):
        for file in files:
            if file.endswith(".jpg") or file.endswith(".png") or file.endswith(".bmp"):
                t.append(os.path.join(root,file))
        for dir in dirs:
            print "reading "+str(dir)+" directory..."
            getFilelist_all(os.path.join(root,dir))
    print str(len(t))+" files read."
    return t

その後cropしていくわけだが、その際に小領域のsizeと、それを動かすstepを指定できるようにした。イメージとしては畳み込みのフィルタの動かし方と同じ感じ。size>stepとすればoverlapしてcropすることもできる。

def crop_img(img_path,size,step,augmentation):
    """
    img_path:  str  画像データのパス
    size:  int  cropサイズ
    step:  int  crop_step
    augmentation: "reverse" -> 左右反転,  "ratate" -> 45度ずつ回転
    """
    if augmentation=="reverse":
        origin = Image.open(img_path)
        reverse = origin.transpose(Image.FLIP_LEFT_RIGHT)
        
        img1 = numpy.array(origin,"f")
        h,w,c = img1.shape
        label = int(img_path.split("/")[-2])
        result = []
        img2 = numpy.array(reverse,"f")
        imgs = [img1, img2]
        
        for img in imgs:
            img = img.transpose(2,0,1)
            for j in range((h-size)/step + 1):
                for i in range( (w-size)/step + 1):
                    tmp = img[:, j*step:(j*step)+size, i*step:(i*step)+size]
                    # flattenして(画像,ラベル)のタプルでリストに格納
                    result.append((tmp.flatten(),label))
            
        return result
        
    elif augmentation=="rotate":
        origin = Image.open(img_path)
        rotate1 = origin.rotate(90)
        rotate2 = origin.rotate(180)
        rotate3 = origin.rotate(270)
        
        img1 = numpy.array(origin,"f")
        h,w,c = img1.shape
        label = int(img_path.split("/")[-2])
        result = []
        
        img2 = numpy.array(rotate1,"f")
        img3 = numpy.array(rotate2,"f")
        img4 = numpy.array(rotate3,"f")
        imgs = [img1, img2, img3, img4]
        
        for img in imgs:
            img = img.transpose(2,0,1)
            for j in range((h-size)/step + 1):
                for i in range( (w-size)/step + 1):
                    tmp = img[:, j*step:(j*step)+size, i*step:(i*step)+size]
                    # flattenして(画像,ラベル)のタプルでリストに格納
                    result.append((tmp.flatten(),label))
            
        return result

    else:
        # ndarrayで画像読み込み
        img = numpy.array(Image.open(img_path), "f")
        h,w,c = img.shape
        label = int(img_path.split("/")[-2])
        result = []
    
        # (row, column, channel) => (channel, row, column)
        img = img.transpose(2,0,1)
    
        for j in range( (h-size)/step + 1):
            for i in range( (w-size)/step + 1):
                tmp = img[:, j*step:(j*step)+size, i*step:(i*step)+size]
                # flattenして(画像,ラベル)のタプルでリストに格納
                result.append((tmp.flatten(),label))

        return result

変数augmetationを"reverse"とすると左右反転、"rotate"とすると45度回転×4枚も加える。
あとは以下の関数でnumpy.ndarrayとして返せば、データセットとして使えるようになる。

def make_dataset(file_list,size,step,augmentation):
    print "cropping images..."
    data = []
    for file_path in file_list:
        tmp = crop_img(file_path, size, step, augmentation)
        data += tmp
    
    # データセットをシャッフル
    random.shuffle(data)
    
    img_set = []
    label_set = []
    a=b=c=0
    for img,label in data:
        if label == 0:
            a += 1
        elif label == 1:
            b += 1
        else:
            c += 1
        img_set.append(img)
        label_set.append(label)
    
    img_set = numpy.asarray(img_set)
    label_set = numpy.asarray(label_set)
    
    print "A:%d , B:%d , C:%d" % (a, b, c)
    return img_set, label_set


def load_dataset(path,size,step,augmentation=None):
    file_list = getFilelist_all(path)
    return make_dataset(file_list,size,step, augmentation)

一応crop後の各クラスの総数を表示するようにしている。


同じデータセットで実験してみても、今のところ特徴量抽出とrandom forestなどの識別器の性能には勝てていない。。
やっぱりチューニングが難しいので結局手間はあんまり変わらないのかな。