Source code for tnmf

# -*- coding: utf-8 -*-
"""
tnmf.py
~~~~~~~
.. topic:: Contents

    The tnmf module is used to perform task-driven nonnegative
    matrix factorisation (TNMF).
    It includes the SupervisedDL class, fit and score methods

    Created on Wed Jun 29 16:37:28 2016

    @authors: bisot, serizel

    .. [#] V.Bisot, R. Serizel, S. Essid, and G. Richard.
        "Feature Learning with Matrix Factorization Applied to
        Acoustic Scene Classification".
        Accepted for publication in *IEEE Transactions on Audio,
        Speech and Language Processing*, 2017
"""

import numpy as np
import spams
from sklearn import decomposition
import beta_ntf
from sklearn import preprocessing
from sklearn.metrics import accuracy_score,  f1_score
from sklearn.linear_model import LogisticRegression
import random
import time
import copy


[docs]class SupervisedDL(object): """ Supervised DL class Task-driven Dictionary Learning with modified algorithm Parameters ---------- data : array, shape (n_samples, n_features) Training data matrix Needs to be provided if initialization is done in the model n_components : int Size of the dictionary n_labels : int number of classes pos : bool, default: True When set to True, the model is fit in its nonnegative formulation n_iter : int Number of epochs on which to run the algorithm lambda1 : float, default: 0.1 Paramter controlling the l1 norm penality for the projection step lambda2 : float, default: 0 Paramter controlling the l2 norm penality for the projection step rho : float, default: 0.001 Initial gradient step parameter mu : float, default: 1 Regularization strenght of the classifier; must be positive. Smaller values specify stronger regularization. agreg : int, optional (default: 1) In the case classification is done on bags of agreg successive projections. Every successive group of agreg projections (without overlapp) will be averaged before classification init : str, {'random', 'nmf', 'dic-learning'} Controls the nature of the dictionary initialization. * Use 'random' for a random initalization of * Use 'nmf' for intializaing with nonnegative matrix factorization * Use 'dic-learning' to intilaize with the scikit-learn DictionaryLearning class n_iter_init : int, optional Number of iterations for the dictionary initialization max_iter_init : int, optional Maximum number of iterations for the classifier initialization max_iter_inloop : int, optional Maximum number of iterations at each epoch for the classifier update batch_size : int, optional Size of the batch (1 for stochastic gradient) verbose : int Set verbose to any positive number for verbosity. Attributes ---------- clf : object Classifier D : array Dictionnary """ def __init__(self, data=np.asarray([[0, 0]]), n_components=64, n_labels=2, pos=True, n_iter=1, lambda1=0, lambda2=0, rho=0.001, verbose=0, mu=1, agreg=1, init='random', n_iter_init=10, batch_size=6250, max_iter_init=10, max_iter_inloop=1): self.data = data self.data_shape = data.shape self.n_components = n_components self.verbose = verbose self.lambda1 = lambda1 self.lambda2 = lambda2 self.n_iter = n_iter self.n_labels = n_labels self.init = init self.rho = rho self.mu = mu self.agreg = agreg self.pos = pos self.analysis = np.zeros(shape=(n_iter, 5)) self.batch_size = batch_size self.max_iter_init = max_iter_init self.max_iter_inloop = max_iter_inloop self.clf = LogisticRegression( C=self.mu, multi_class='multinomial', solver='lbfgs', max_iter=self.max_iter_init, warm_start=True) if self.init == 'random': if self.pos is False: self.D = np.random.randn(self.data_shape[1], self.n_components) if self.pos is True: self.D = np.abs(np.random.randn(self.data_shape[1], self.n_components)) self.D = preprocessing.normalize(self.D, axis=0) if self.init == 'NMF': if self.verbose > 0: print("Initializing dictionnary with beta_ntf") ntf = beta_ntf.BetaNTF(data_shape=self.data_shape, n_components=self.n_components, n_iter=n_iter_init, verbose=False, beta=2) ntf.fit(self.data) self.D = ntf.factors_[1] self.D = preprocessing.normalize(self.D, axis=0) if self.init == 'DictionnaryLearning': if self.verbose > 0: print("""Initializing dictionnary with sklearn DictionnaryLearning""") u_dl = decomposition.DictionaryLearning( n_components=self.n_components, alpha=0, max_iter=n_iter_init) u_dl.fit(self.data) self.D = preprocessing.normalize(u_dl.components_.T, axis=0)
[docs] def mean_frames(self, X=0, agreg=15): """ Averages every successive group of agreg rows in the a matrix Parameters ---------- X : array, shape (n_samples, n_features) Matrix to reduce by averaging agreg : int Specifies size of the groups to average Returns ------- X_mean: array, shape(n_samples/agreg, n_features) Averaged matrix """ return np.mean(np.reshape(X, (X.shape[0]/agreg, agreg, -1)), axis=1)
[docs] def project_data(self, X, agreg=1): """ Projects data on the model dictionary Parameters ---------- X : array, shape (n_samples, n_features) Matrix to project on dictoonary agreg : int Specifies size of the groups to average after projection Returns ------- projections: array, shape(n_samples/agreg, n_components) Projection matrix """ alpha_mat = spams.lasso(np.asfortranarray(np.transpose(X)), D=np.asfortranarray(self.D), lambda1=self.lambda1, lambda2=self.lambda2, mode=2, pos=self.pos) alpha_mat = alpha_mat.toarray() if agreg > 1: return np.mean( np.reshape( alpha_mat, (alpha_mat.shape[0]/agreg, agreg, -1)), axis=1) else: return alpha_mat
[docs] def predict(self, X): """ Predicts labels from a given matrix using the model classifier Parameters ---------- X : array, shape (n_samples, n_features) Matrix to project on dictoonary Returns ------- y_pred: array, shape(n_samples/agreg,) Predicted labels """ alpha_mat = np.transpose(self.project_data(X=X, agreg=self.agreg)) y_pred = self.clf.predict(alpha_mat) return y_pred
[docs] def fit(self, X_train, y_train, X_test=np.array([]), y_test=np.array([])): """ Fits the model to input training data Parameters ---------- X_train : array, shape (n_samples, n_features) Training data matrix X_test : array, shape (n_samples_test, n_features), optional Test data matrix Usefull only to check performance during development y_train : array-like, shape (n_samples/self.agreg,) Target vector relative to X_train. y_train : array-like, shape (n_samples_test/self.agreg,) Target vector relative to X_test. Returns ------- """ for i in range(self.n_iter): # Classifier update step # tic = time.time() # Project full data on D alpha_mat = spams.lasso( np.asfortranarray(X_train.T), D=np.asfortranarray(self.D), lambda1=self.lambda1, lambda2=self.lambda2, mode=2, pos=self.pos) tic = time.time() # Average projections if necessary alpha_mean = self.mean_frames( alpha_mat.toarray().T, agreg=self.agreg) alpha_mean = preprocessing.scale(alpha_mean, with_mean=False) # Update classifier self.clf.fit(alpha_mean, y_train) self.w = self.clf.coef_ self.b = self.clf.intercept_ if i == 0: # Classifier is initialized on first iteration # For further iterations, the LR class is only updated on # 1 iteration with warm restart self.clf.max_iter = self.max_iter_inloop # Print current performance # if self.verbose > 0: # Print the scores print("Iteration number %i \n" % i) if X_test.any(): a, f1, = self.scores(X=X_test, y=y_test) print("""Classification scores on test set: a=%0.3f f1=%0.3f""" % (a, f1)) a, f1, = self.scores(X=X_train, y=y_train) print("""Classification scores on train set: a=%0.3f f1=%0.3f""" % (a, f1)) # Dictionary update step # draw = range(self.agreg*len(y_train)) random.shuffle(draw) nb_batch = len(draw)/int(self.batch_size) + 1 for t in range(nb_batch): # Select and project a data point ind = draw[ t*self.batch_size: min((t+1)*self.batch_size, len(draw))] x_mat = np.transpose(X_train[ind, ]) ind = [x/self.agreg for x in ind] y = y_train[ind] alpha_mat = spams.lasso(np.asfortranarray(x_mat), D=np.asfortranarray(self.D), lambda1=self.lambda1, lambda2=self.lambda2, mode=2, pos=self.pos) alpha_mean = alpha_mat.toarray() if alpha_mean.nonzero()[0].any(): # Step decaying heuristic rho_t = np.min( [ self.rho, self.rho * ( nb_batch*self.n_iter*self.batch_size ) / ( 10*( (i*nb_batch*self.batch_size) + t*self.batch_size + 1))]) # Gradient of loss with respect to projections denom = np.zeros((alpha_mat.shape[1], )) num_alpha = np.zeros(alpha_mat.shape) for k in range(self.n_labels): tmp = np.exp( np.dot(self.w[k, :], alpha_mean) + self.b[k]) denom += tmp num_alpha += (np.dot( self.w[k, :][:, np.newaxis], tmp[:, np.newaxis].T)) d_alpha = ( num_alpha.T / denom[:, np.newaxis]) - self.w[y, :] # Update D self.update_D( x_mat=x_mat, y=y, denom=denom, num_alpha=num_alpha, d_alpha=d_alpha, alpha_mat=alpha_mat, rho_t=rho_t) # Print final performance # if self.verbose > 0: # Print the scores print("Final model") if X_test.any(): a, f1 = self.scores(X=X_test, y=y_test) print("""Classification scores on test set: a=%0.3f f1=%0.3f""" % (a, f1)) a, f1 = self.scores(X=X_train, y=y_train) print("""Classification scores on train set: a=%0.3f f1=%0.3f""" % (a, f1))
[docs] def scores(self, X, y): """ Compute classification scores (accurracy and F1-score). on a given dataset. Parameters ---------- X_train : array, shape (n_samples, n_features) Training data matrix X_test : array, shape (n_samples_test, n_features), optional Test data matrix Usefull only to check performance during development y_train : array-like, shape (n_samples/self.agreg,) Target vector relative to X_train. y_train : array-like, shape (n_samples_test/self.agreg,) Target vector relative to X_test. Returns ------- y_pred: array, shape(n_samples/agreg,) """ tic = time.time() alpha_mat = spams.lasso( np.asfortranarray(X.T), D=np.asfortranarray(self.D), lambda1=self.lambda1, lambda2=self.lambda2, mode=2, pos=self.pos) alpha_mean = self.mean_frames(alpha_mat.toarray().T, agreg=self.agreg) alpha_mean = preprocessing.scale(alpha_mean, with_mean=False) y_pred = self.clf.predict(alpha_mean) a = accuracy_score(y, y_pred) f1 = f1_score(y, y_pred, average='weighted') return a, f1
[docs] def update_D(self, x_mat=0, y=0, alpha_mat=0, denom=0, num_alpha=0, d_alpha=0, rho_t=0.001): """ Updates dictionary Parameters ---------- x_mat : array shape (n_features, batch_size) Input data (batch_size = 1 in the stochastic gradient case) y : array shape (batch_size, ) Labels corresponding to the input data alpha_mat : array shape (batch_size, n_components) Projections d_alpha : array shape (batch_size, n_components) Gradient of loss with respect to projections denom : array shape (n_components, ) Gradient denominator num_alpha : array (batch_size, n_components) Gradient numerator rho_t : float Learning rate """ non_zero = alpha_mat.nonzero() beta = np.zeros(num_alpha.shape) for i in range(num_alpha.shape[1]): ind = non_zero[0][non_zero[1] == i] if ind.shape[0] > 1: beta[ind, i] = np.dot( spams.invSym( np.asfortranarray( np.dot( np.transpose(self.D[:, ind]), self.D[:, ind]) + self.lambda2)), d_alpha[i, ind]) elif ind.shape[0] == 1: beta[ind, i] = np.dot( 1./(np.dot(np.transpose(self.D[:, ind]), self.D[:, ind]) + self.lambda2), d_alpha[i, ind]) alpha_mat = alpha_mat.toarray() d_D = np.dot((x_mat-np.dot(self.D, alpha_mat)), np.transpose(beta)) d_D -= np.dot(np.dot(self.D, beta), np.transpose(alpha_mat)) self.D = self.D - rho_t * d_D if self.pos is True: self.D[np.where(self.D < 0)] = 0.0000001 self.D = preprocessing.normalize(self.D, axis=0)