Source code for beta_nmf

# -*- coding: utf-8 -*-
"""
beta\_nmf.py
~~~~~~~~~~~

.. topic:: Contents

  The beta_nmf module includes the beta\_nmf class,
  fit function and theano functions to compute updates and cost."""

import time
import numpy as np
import theano
import base
import updates
import costs


[docs]class BetaNMF(object): """BetaNMF class Performs nonnegative matrix factorization with Theano. L1-sparsity and group sparsity constraints can be applied on activations. Parameters ---------- data_shape : tuple composed of integers the shape of the data to approximate n_components : positive integer (default 50) the number of latent components for the NMF model beta : arbitrary float (default 2) the beta-divergence to consider, particular cases of interest are * beta=2 : Euclidean distance * beta=1 : Kullback Leibler * beta=0 : Itakura-Saito n_iter : Positive integer (default 100) number of iterations fixed_factors : array (default Null) list of factors that are not updated e.g. fixed_factors = [0] -> H is not updated fixed_factors = [1] -> W is not updated l_sparse : Float (default 0.) sparsity constraint sparse_idx : Array boundaries of the groups for group sparisty [start, stop] verbose : Integer the frequence at which the score should be computed and displayed (number of iterations between each computation) Attributes ---------- factors : list of arrays The estimated factors (factors[0] = H)""" # Constructor def __init__(self, data_shape, n_components=50, beta=2, n_iter=100, fixed_factors=None, verbose=0, l_sparse=0., sparse_idx=None): self.data_shape = data_shape self.n_components = n_components self.n_components = np.asarray(n_components, dtype='int32') self.beta = theano.shared(np.asarray(beta, theano.config.floatX), name="beta") self.verbose = verbose self.n_iter = n_iter self.scores = [] if fixed_factors is None: fixed_factors = [] self.fixed_factors = fixed_factors fact_ = [base.nnrandn((dim, self.n_components)) for dim in data_shape] self.w = theano.shared(fact_[1].astype(theano.config.floatX), name="W", borrow=True, allow_downcast=True) self.h = theano.shared(fact_[0].astype(theano.config.floatX), name="H", borrow=True, allow_downcast=True) self.factors = [self.h, self.w] self.x = theano.shared( np.zeros((data_shape)).astype(theano.config.floatX), name="X") self.l_sparse = theano.shared(l_sparse, name="l_sparse") if self.l_sparse.get_value() > 0: if sparse_idx is None: self.sparse_idx = None else: self.sparse_idx = theano.shared( sparse_idx, name="sparse_idx") self.get_updates_functions() self.get_div_function()
[docs] def fit(self, data): """Learns NMF model Parameters ---------- X : ndarray with nonnegative entries The input array """ self.x.set_value(data.astype(theano.config.floatX)) print 'Fitting NMF model with %d iterations....' % self.n_iter # main loop for it in range(self.n_iter): if 'tick' not in locals(): tick = time.time() if self.verbose > 0: if it == 0: score = self.score() print ('Iteration %d / %d, duration=%.1fms, cost=%f' % (it, self.n_iter, (time.time() - tick) * 1000, score)) if 1 not in self.fixed_factors: self.train_w() if 0 not in self.fixed_factors: self.train_h() if self.verbose > 0: if (it+1) % self.verbose == 0: score = self.score() print ('Iteration %d / %d, duration=%.1fms, cost=%f' % (it+1, self.n_iter, (time.time() - tick) * 1000, score)) tick = time.time() print 'Done.'
[docs] def get_div_function(self): """Compile the theano-based divergence function""" self.div = theano.function(inputs=[], outputs=costs.beta_div(self.x, self.w.T, self.h, self.beta), name="div", allow_input_downcast=True)
[docs] def get_updates_functions(self): """Compile the theano based update functions""" print "Standard rules for beta-divergence" if self.l_sparse.get_value() == 0: h_update = updates.beta_H(self.x, self.w, self.h, self.beta) else: if self.sparse_idx is None: h_update = updates.beta_H_Sparse(self.x, self.w, self.h, self.beta, self.l_sparse) else: h_update = updates.beta_H_groupSparse(self.x, self.w, self.h, self.beta, self.l_sparse, self.sparse_idx[0, ], self.sparse_idx[1, ]) w_update = updates.beta_W(self.x, self.w, self.h, self.beta) self.train_h = theano.function(inputs=[], outputs=[], updates={self.h: h_update}, name="trainH", allow_input_downcast=True) self.train_w = theano.function(inputs=[], outputs=[], updates={self.w: w_update}, name="trainW", allow_input_downcast=True)
[docs] def score(self): """Compute factorisation score Returns ------- out : Float factorisation score""" return self.div()