# -*- coding: utf-8 -*-
"""
tgnmf.py
~~~~~~~~
.. topic:: Contents
The tgnmf module is used to perform task-driven group nonnegative
matrix factorisation (TGNMF).
It includes the SupervisedDL class, fit and score methods
Created on Wed Jun 29 16:37:28 2016
@authors: bisot, serizel
.. [#] V.Bisot, R. Serizel, S. Essid, and G. Richard.
"Feature Learning with Matrix Factorization Applied to
Acoustic Scene Classification".
Accepted for publication in *IEEE Transactions on Audio,
Speech and Language Processing*, 2017
.. [#] R. Serizel, V.Bisot, S. Essid, and G. Richard.
“Supervised group nonnegative matrix factorisation with similarity
constraints and applications to speaker identification”.
In Proc. of *2017 IEEE International Conference on Acoustics,
Speech and Signal Processing (ICASSP)*, 2017.
"""
import numpy as np
import spams
from sklearn import decomposition
import beta_ntf
from sklearn import preprocessing
from sklearn.metrics import accuracy_score, f1_score
from sklearn.linear_model import LogisticRegression
import random
import time
from sklearn.utils.extmath import (
logsumexp, log_logistic, safe_sparse_dot,
softmax, squared_norm)
from sklearn.utils.validation import (
DataConversionWarning,
check_X_y, NotFittedError)
from sklearn.preprocessing import LabelEncoder, LabelBinarizer
[docs]class SupervisedDL(object):
""" Supervised DL class
Task-driven Dictionary Learning with modified algorithm
Parameters
----------
data : array, shape (n_samples, n_features)
Training data matrix
Needs to be provided if initialization is done in the model
n_labels : int
number of classes
pos : bool, default: True
When set to True, the model is fit in its nonnegative formulation
n_iter : int
Number of epochs on which to run the algorithm
lambda1 : float, default: 0.1
Paramter controlling the l1 norm penality for the projection step
lambda2 : float, default: 0
Paramter controlling the l2 norm penality for the projection step
rho : float, default: 0.001
Initial gradient step parameter
mu : float, default: 1
Regularization strenght of the classifier; must be positive.
Smaller values specify stronger regularization.
agreg : int, optional (default: 1)
In the case classification is done on bags of agreg successive
projections. Every successive group of agreg projections
(without overlapp) will be averaged before classification
init : str, {'random', 'nmf', 'dic-learning'}
Controls the nature of the dictionary initialization.
* Use 'random' for a random initalization of
* Use 'nmf' for intializaing with nonnegative matrix factorization
* Use 'dic-learning' to intilaize with the scikit-learn DictionaryLearning class
n_iter_init : int, optional
Number of iterations for the dictionary initialization
max_iter_init : int, optional
Maximum number of iterations for the classifier initialization
max_iter_inloop : int, optional
Maximum number of iterations at each epoch for the classifier update
max_iter_fin : int, optional
Maximum number of iterations once the dictionnary is learnt
batch_size : int, optional
Size of the batch (1 for stochastic gradient)
lbl : array (n_couples, 2)
Unique (class, session) couples (See also [3]_)
* 'lbl[:, 0]': class labels
* 'lbl[:, 1]': session labels
This is equivalent to the attribute 'self.iters['cls']' in GNMF_
ses_train : array (n_samples, 1)
Session labels for the data
sub_dict_size : int
Size of the sub-dictionnaries related to a unique couple (cls, ses)
See also [3]_
k_cls : int
Number of components that are affected to cls related bases.
See also [3]_
k_ses : int
Number of components that are affected to ses related bases.
See also [3]_
nu1 : float
Class similarity constraint
See also [3]_
nu2 : float
Session similarity constraint
See also [3]_
verbose : int
Set verbose to any positive number for verbosity.
Attributes
----------
clf : object
Classifier
D : array
Dictionnary
n_components : int
Size of the dictionary
dist_ses : array
Distance between bases related to the same session
dist_cls :
Distance between bases related to the same class
cst : array
Update constraint computed from dist_ses and dist_cls
References
----------
.. [#] R. Serizel, S. Essid, and G. Richard.
“Group nonnegative matrix factorisation with speaker and session
variability compensation for speaker identification”.
In Proc. of *2016 IEEE International Conference on Acoustics,
Speech and Signal Processing (ICASSP)*, pp. 5470-5474, 2016.
.. _GNMF: http://rserizel.github.io/groupNMF/_modules/beta_nmf_class.html#ClassBetaNMF
"""
def __init__(self, data=np.asarray([[0, 0]]),
n_labels=2, pos=True, n_iter=1,
lambda1=0, lambda2=0, rho=0.001, verbose=0, mu=1, agreg=1,
init='random', n_iter_init=10, batch_size=6250,
max_iter_init=10, max_iter_inloop=1, max_iter_fin=0,
lbl=np.asarray([[0, 0]]), ses_train=0,
sub_dict_size=1, k_cls=0, k_ses=0, nu1=0, nu2=0):
self.data = data
self.data_shape = data.shape
self.verbose = verbose
self.lambda1 = lambda1
self.lambda2 = lambda2
self.n_iter = n_iter
self.n_labels = n_labels
self.init = init
self.rho = rho
self.mu = mu
self.agreg = agreg
self.pos = pos
self.analysis = np.zeros(shape=(n_iter, 5))
self.batch_size = batch_size
self.max_iter_init = max_iter_init
self.max_iter_inloop = max_iter_inloop
self.max_iter_fin = max_iter_fin
self.clf = LogisticRegression(
C=self.mu, multi_class='multinomial',
solver='lbfgs', max_iter=self.max_iter_init, warm_start=True)
self.lbl = lbl
self.sub_dict_size = sub_dict_size
self.k_cls = k_cls
self.k_ses = k_ses
self.n_components = lbl.shape[0]*(k_cls + k_ses)
self.nu1 = nu1
self.nu2 = nu2
self.ses_train = ses_train
if self.init == 'random':
if self.pos is False:
self.D = np.random.randn(self.data_shape[1],
self.n_components)
if self.pos is True:
self.D = np.abs(np.random.randn(self.data_shape[1],
self.n_components))
self.D = preprocessing.normalize(self.D, axis=0)
if self.init == 'NMF':
if self.verbose > 0:
print("Initializing dictionnary with beta_ntf")
ntf = beta_ntf.BetaNTF(data_shape=self.data_shape,
n_components=self.n_components,
n_iter=n_iter_init, verbose=False, beta=2)
ntf.fit(self.data)
self.D = ntf.factors_[1]
self.D = preprocessing.normalize(self.D, axis=0)
if self.init == 'DictionnaryLearning':
if self.verbose > 0:
print(""" Initializing dictionnary
with sklearn DictionnaryLearning""")
u_dl = decomposition.DictionaryLearning(
n_components=self.n_components,
alpha=0, max_iter=n_iter_init)
u_dl.fit(self.data)
self.D = preprocessing.normalize(
u_dl.components_.T, axis=0)
self.compute_cst()
[docs] def compute_cst(self):
""" Compute the update constraints based on the distance between
session related bases and the distance between class related bases"""
lbl = self.lbl
cls_card = np.zeros((int(np.amax(lbl[:, 0])), ))
cls_sum = np.zeros((int(np.amax(lbl[:, 0])),
self.D.shape[0], self.k_cls))
self.dist_cls = 0
for i in range(len(cls_card)):
cls_card[i] = max(lbl[lbl[:, 0] == i].shape[0] - 1, 0)
ref_sub_ind = np.arange(i*self.sub_dict_size,
i*self.sub_dict_size + self.k_cls)
for j in np.where(lbl[lbl[:, 0] == i])[0]:
sub_ind = np.arange(j*self.sub_dict_size,
j*self.sub_dict_size + self.k_cls)
cls_sum[i, ] += self.D[:, sub_ind]
self.dist_cls += np.linalg.norm(
self.D[:, sub_ind] - self.D[:, ref_sub_ind])
ses_card = np.zeros((int(np.amax(lbl[:, 1])), ))
ses_sum = np.zeros((int(np.amax(lbl[:, 1])),
self.D.shape[0], self.k_ses))
self.dist_ses = 0
for i in range(len(ses_card)):
ses_card[i] = max(lbl[lbl[:, 1] == i].shape[0] - 1, 0)
ref_sub_ind = np.arange(
i*self.sub_dict_size + self.k_cls,
i*self.sub_dict_size + self.k_cls + self.k_ses)
for j in np.where(lbl[lbl[:, 1] == i])[0]:
sub_ind = np.arange(
j*self.sub_dict_size + self.k_cls,
j*self.sub_dict_size + self.k_cls + self.k_ses)
ses_sum[i, ] += self.D[:, sub_ind]
self.dist_ses += np.linalg.norm(
self.D[:, sub_ind] - self.D[:, ref_sub_ind])
self.cst = np.zeros((self.D.shape))
for i in range(lbl.shape[0]):
cls = int(lbl[i, 0]-1)
ses = int(lbl[i, 1]-1)
sub_ind = np.arange(i*self.sub_dict_size,
(i+1)*self.sub_dict_size)
D_sub = self.D[:, sub_ind]
sub_cst = np.zeros((self.D.shape[0], self.sub_dict_size))
sub_cst[:, :self.k_cls] = self.nu1 * (
cls_card[cls] * D_sub[:, :self.k_cls] -
(cls_sum[cls, ] - D_sub[:, :self.k_cls]))
sub_cst[:, self.k_cls: self.k_cls+self.k_ses] = self.nu2 * (
ses_card[ses] *
D_sub[:, :self.k_cls:self.k_cls+self.k_ses] -
(ses_sum[ses] - D_sub[:, :self.k_cls:self.k_cls+self.k_ses]))
self.cst[:, sub_ind] = sub_cst
[docs] def mean_frames(self, X=0, agreg=15):
"""
Averages every successive group of agreg rows in the a matrix
Parameters
----------
X : array, shape (n_samples, n_features)
Matrix to reduce by averaging
agreg : int
Specifies size of the groups to average
Returns
-------
X_mean: array, shape(n_samples/agreg, n_features)
Averaged matrix
"""
return np.mean(np.reshape(X, (X.shape[0]/agreg, agreg, -1)), axis=1)
[docs] def project_data(self, X, agreg=1):
"""
Projects data on the model dictionary
Parameters
----------
X : array, shape (n_samples, n_features)
Matrix to project on dictoonary
agreg : int
Specifies size of the groups to average after projection
Returns
-------
projections: array, shape(n_samples/agreg, n_components)
Projection matrix
"""
alpha_mat = spams.lasso(np.asfortranarray(np.transpose(X)),
D=np.asfortranarray(self.D),
lambda1=self.lambda1,
lambda2=self.lambda2, mode=2,
pos=self.pos)
alpha_mat = alpha_mat.toarray()
if agreg > 1:
return np.mean(
np.reshape(
alpha_mat,
(alpha_mat.shape[0]/agreg, agreg, -1)),
axis=1)
else:
return alpha_mat
[docs] def predict(self, X):
"""
Predicts labels from a given matrix using the model classifier
Parameters
----------
X : array, shape (n_samples, n_features)
Matrix to project on dictoonary
Returns
-------
y_pred: array, shape(n_samples/agreg,)
Predicted labels
"""
alpha_mat = np.transpose(self.project_data(X=X, agreg=self.agreg))
y_pred = self.clf.predict(alpha_mat)
return y_pred
[docs] def fit(self, X_train, y_train, X_test=np.array([]), y_test=np.array([])):
"""
Fits the model to input training data
Parameters
----------
X_train : array, shape (n_samples, n_features)
Training data matrix
X_test : array, shape (n_samples_test, n_features), optional
Test data matrix
Usefull only to check performance during development
y_train : array-like, shape (n_samples/self.agreg,)
Target vector relative to X_train.
y_train : array-like, shape (n_samples_test/self.agreg,)
Target vector relative to X_test.
Returns
-------
"""
for i in range(self.n_iter):
# Classifier update step #
tic = time.time()
# Project full data on D
alpha_mat = spams.lasso(
np.asfortranarray(X_train.T),
D=np.asfortranarray(self.D),
lambda1=self.lambda1, lambda2=self.lambda2,
mode=2, pos=self.pos)
# Average projections if necessary
alpha_mean = self.mean_frames(
alpha_mat.toarray().T, agreg=self.agreg)
alpha_mean = preprocessing.scale(alpha_mean, with_mean=False)
# Update classifier
self.clf.fit(alpha_mean, y_train)
self.w = self.clf.coef_
self.b = self.clf.intercept_
self. compute_cst()
if i == 0:
# Classifier is initialized on first iteration
# For further iterations, the LR class is only updated on
# 1 iteration with warm restart
self.clf.max_iter = self.max_iter_inloop
# Print current performance #
if self.verbose > 0: # Print the scores
print("Iteration number %i \n" % i)
X2, y = check_X_y(
alpha_mean, y_train, accept_sparse='csr',
dtype=np.float64, order="C")
lbin = LabelBinarizer()
Y_binarized = lbin.fit_transform(y_train)
yo = np.zeros(shape=(self.n_labels, 1))
yo[:, 0] = self.clf.intercept_
yo = np.concatenate((self.clf.coef_, yo), axis=1)
if Y_binarized.shape[1] == 1:
Y_binarized = np.hstack([1 - Y_binarized, Y_binarized])
ut = _multinomial_loss(
yo, X2, Y_binarized, 1/self.mu,
sample_weight=np.ones(y.shape[0]))
yo = np.zeros(shape=(self.n_labels, 1))
print "Classification costs: ", ut[0]
print(
"Class distance: ", self.dist_cls,
"Session distance:", self.dist_ses)
if i == 0:
self.nu1 *= ut[0]/self.dist_cls
self.nu2 *= ut[0]/self.dist_ses
print "Weights nu1 and nu2", self.nu1, self.nu2
self.compute_cst()
if X_test.any():
a, f1 = self.scores(X=X_test, y=y_test)
print(""" Classification scores on test set:
a=%0.3f f1=%0.3f""" % (a, f1))
a, f1 = self.scores(X=X_train, y=y_train)
print(""" Classification scores on train set:
a=%0.3f f1=%0.3f""" % (a, f1))
# Dictionary update step #
draw = range(self.agreg*len(y_train))
random.shuffle(draw)
nb_batch = len(draw)/int(self.batch_size) + 1
for t in range(nb_batch):
# Select and project a data point
ind = draw[
t*self.batch_size: min((t+1)*self.batch_size, len(draw))]
x_mat = np.transpose(X_train[ind, ])
ind = [x/self.agreg for x in ind]
y = y_train[ind]
ses = self.ses_train[ind]
alpha_mat = spams.lasso(np.asfortranarray(x_mat),
D=np.asfortranarray(self.D),
lambda1=self.lambda1,
lambda2=self.lambda2,
mode=2, pos=self.pos)
alpha_mean = alpha_mat.toarray()
if alpha_mean.nonzero()[0].any():
# Step decaying heuristic
rho_t = np.min(
[
self.rho,
self.rho * (
nb_batch*self.n_iter*self.batch_size
) / (
10*(
(i*nb_batch*self.batch_size) +
t*self.batch_size + 1))])
# Gradient of loss with respect to projections
denom = np.zeros((alpha_mat.shape[1], ))
num_alpha = np.zeros(alpha_mat.shape)
for k in range(self.n_labels):
tmp = np.exp(
np.dot(self.w[k, :], alpha_mean) + self.b[k])
denom += tmp
num_alpha += (np.dot(
self.w[k, :][:, np.newaxis],
tmp[:, np.newaxis].T))
d_alpha = (
num_alpha.T / denom[:, np.newaxis]) - self.w[y, :]
# Update D
self.update_D(
x_mat=x_mat, y=y, denom=denom,
num_alpha=num_alpha, d_alpha=d_alpha,
alpha_mat=alpha_mat, rho_t=rho_t, ses=ses)
if self.verbose > 0:
print "Iteration time", time.time() - tic
if self.max_iter_fin > 0:
alpha_mat = spams.lasso(
np.asfortranarray(X_train.T),
D=np.asfortranarray(self.D),
lambda1=self.lambda1, lambda2=self.lambda2,
mode=2, pos=self.pos)
# Average projections if necessary
alpha_mean = self.mean_frames(
alpha_mat.toarray().T, agreg=self.agreg)
alpha_mean = preprocessing.scale(alpha_mean, with_mean=False)
# Update classifier
self.clf.max_iter = self.max_iter_fin
self.clf.fit(alpha_mean, y_train)
# Print final performance #
if self.verbose > 0: # Print the scores
print("Final model")
if X_test.any():
a, f1 = self.scores(X=X_test, y=y_test)
print(""" Classification scores on test set:
a=%0.3f f1=%0.3f""" % (a, f1))
a, f1, = self.scores(X=X_train, y=y_train)
print(""" Classification scores on train set:
a=%0.3f f1=%0.3f""" % (a, f1))
[docs] def scores(self, X, y):
"""
Compute classification scores (accurracy and F1-score).
on a given dataset.
Parameters
----------
X_train : array, shape (n_samples, n_features)
Training data matrix
X_test : array, shape (n_samples_test, n_features), optional
Test data matrix
Usefull only to check performance during development
y_train : array-like, shape (n_samples/self.agreg,)
Target vector relative to X_train.
y_train : array-like, shape (n_samples_test/self.agreg,)
Target vector relative to X_test.
Returns
-------
y_pred: array, shape(n_samples/agreg,)
"""
tic = time.time()
alpha_mat = spams.lasso(
np.asfortranarray(X.T),
D=np.asfortranarray(self.D),
lambda1=self.lambda1,
lambda2=self.lambda2, mode=2,
pos=self.pos)
alpha_mean = self.mean_frames(
alpha_mat.toarray().T, agreg=self.agreg)
alpha_mean = preprocessing.scale(
alpha_mean, with_mean=False)
y_pred = self.clf.predict(alpha_mean)
a = accuracy_score(y, y_pred)
f1 = f1_score(y, y_pred, average='weighted')
return a, f1
[docs] def update_D(self, x_mat=0, y=0, alpha_mat=0, denom=0, num_alpha=0,
d_alpha=0, rho_t=0.001, ses=0):
"""
Updates dictionary
Parameters
----------
x_mat : array shape (n_features, batch_size)
Input data (batch_size = 1 in the stochastic gradient case)
y : array shape (batch_size, )
Labels corresponding to the input data
alpha_mat : array shape (batch_size, n_components)
Projections
d_alpha : array shape (batch_size, n_components)
Gradient of loss with respect to projections
denom : array shape (n_components, )
Gradient denominator
num_alpha : array (batch_size, n_components)
Gradient numerator
rho_t : float
Learning rate
ses : array (batch_size, )
Session labels for the data
"""
non_zero = alpha_mat.nonzero()
beta = np.zeros(num_alpha.shape)
beta_sub = np.zeros(num_alpha.shape)
n_nan = 0
tot_loop = 0
lr_weight = np.zeros((beta.shape[0], ))
alpha_mat = alpha_mat.toarray()
alpha_mat_sub = np.zeros(alpha_mat.shape)
for i in range(num_alpha.shape[1]):
cls_ind = np.where(
((self.lbl[:, 0] == y[i]) & (self.lbl[:, 1] == ses[i])))[0][0]
sub_ind = np.arange(cls_ind*self.sub_dict_size,
(cls_ind+1)*self.sub_dict_size)
ind = non_zero[0][non_zero[1] == i]
lr_weight[sub_ind] += 1
alpha_mat_sub[sub_ind, i] = alpha_mat[sub_ind, i]
if sub_ind.shape[0] > 1:
tot_loop += 1
beta[ind, i] = np.dot(
spams.invSym(
np.asfortranarray(
np.dot(
np.transpose(self.D[:, ind]),
self.D[:, ind]) +
self.lambda2)),
d_alpha[i, ind])
if np.isnan(np.amax(np.abs(beta[sub_ind, i]))):
n_nan += 1
beta[ind, i] = np.zeros(beta[sub_ind, i].shape)
beta_sub[sub_ind, i] = beta[sub_ind, i]
elif sub_ind.shape[0] == 1:
tot_loop += 1
beta[sub_ind, i] = np.dot(
1./(np.dot(np.transpose(self.D[:, sub_ind]),
self.D[:, sub_ind]) + self.lambda2),
d_alpha[i, ind])
alpha_mat_sub[sub_ind, i] = alpha_mat[sub_ind, i]
if n_nan > 0:
if tot_loop/(n_nan+1) < 2:
print "Warning nan occurence", n_nan, "total loops", tot_loop
d_D = np.dot(
(x_mat-np.dot(self.D, alpha_mat_sub)), np.transpose(beta_sub))
d_D -= np.dot(np.dot(self.D, beta_sub), np.transpose(alpha_mat_sub))
sub_ind = np.arange(cls_ind*self.sub_dict_size,
(cls_ind+1)*self.sub_dict_size)
rho_t *= lr_weight/self.batch_size
self.D = self.D - rho_t * d_D - self.cst
if self.pos is True:
self.D[np.where(self.D < 0)] = 0.0000001
self.D = preprocessing.normalize(self.D, axis=0)
def _multinomial_loss(w, X, Y, alpha, sample_weight):
"""Computes multinomial loss and class probabilities.
Parameters
----------
w : ndarray, shape (n_classes * n_features,) or
(n_classes * (n_features + 1),)
Coefficient vector.
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Training data.
Y : ndarray, shape (n_samples, n_classes)
Transformed labels according to the output of LabelBinarizer.
alpha : float
Regularization parameter. alpha is equal to 1 / C.
sample_weight : array-like, shape (n_samples,) optional
Array of weights that are assigned to individual samples.
If not provided, then each sample is given unit weight.
Returns
-------
loss : float
Multinomial loss.
p : ndarray, shape (n_samples, n_classes)
Estimated class probabilities.
w : ndarray, shape (n_classes, n_features)
Reshaped param vector excluding intercept terms.
"""
n_classes = Y.shape[1]
n_features = X.shape[1]
fit_intercept = w.size == (n_classes * (n_features + 1))
w = w.reshape(n_classes, -1)
sample_weight = sample_weight[:, np.newaxis]
if fit_intercept:
intercept = w[:, -1]
w = w[:, :-1]
else:
intercept = 0
p = safe_sparse_dot(X, w.T)
p += intercept
p -= logsumexp(p, axis=1)[:, np.newaxis]
loss = -(sample_weight * Y * p).sum()
loss += 0.5 * alpha * squared_norm(w)
p = np.exp(p, p)
return loss, p, w
def _intercept_dot(w, X, y):
"""Computes y * np.dot(X, w).
It takes into consideration if the intercept should be fit or not.
Parameters
----------
w : ndarray, shape (n_features,) or (n_features + 1,)
Coefficient vector.
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Training data.
y : ndarray, shape (n_samples,)
Array of labels.
"""
c = 0.
if w.size == X.shape[1] + 1:
c = w[-1]
w = w[:-1]
z = safe_sparse_dot(X, w) + c
return w, c, y * z