Source code for ruckus.sampling

import numpy as _np

[docs]class KernelHerd(): r""" Iterator which produces the kernel herd [1] from an RKHS and a distribution embedded in it. **Caution:** this method only works well if the embeddings :math:`\phi(x)` have little overlap with one another! Given a distribution embedding: .. math:: \mu_p = \int \phi(x) p(x) dx \ , a deterministic chaotic sequence :math:`(x_t)` may be generated by the following algorithm, beginning with a vector :math:`w_0\in H`: .. math:: x_{t} = \underset{x}{\mathrm{argmax}}\left<\phi(x),w_t\right> \\ w_{t+1} = w_t + \mu_p - \phi(x_t) Over time, the kernel embedding of the sequence .. math:: \hat{\mu}_T = \frac{1}{T}\sum_0^{T-1} \phi(x_t) converges to :math:`\mu_p` in :math:`O(T^{-1})`. 1. `Chen, Y., Welling, M., Smola, A. "Super-Samples from Kernel Herding." Proceedings of the Twenty-Sixth Conference on Uncertainty in Artificial Intelligence (UAI2010) <https://arxiv.org/abs/1203.3472>`_ ========== Parameters ========== :param p: The embedded vector :math:`\mu_p`. :type p: :py:class:`numpy.ndarray` :param rkhs: The fitted RKHS in which :math:`\mu_p` is embedded. :type rkhs: :py:class:`RKHS` :param size: Number of samples to be generated. :type size: int :param w_init: The initial vector for the herding algorithm. If ``None``, this is set to ``p``. :type w_init: :py:class:`numpy.ndarray` :param domain: The range of values to be sampled from. If ``None``, this is set to ``rkhs.X_fit_``. :type domain: :py:class:`numpy.ndarray` """ def __init__(self,p,rkhs,size,w_init=None,domain=None,): self.p = p self.rkhs = rkhs self.size = size if domain is None: self._X = rkhs.X_fit_ else: self._X = domain self._Phi = rkhs.transform(self._X) if w_init is None: self._w = p else: self._w = w_init self._n = 0 def __iter__(self): return self def __next__(self): return self._herd_step() def _herd_step(self): if self._n < self.size: #k = _np.argmax(_np.tensordot(self._Phi,self._w,axes=(tuple(range(1,len(self.rkhs.shape_out_)+1,)), # tuple(range(len(self.rkhs.shape_out_)))))) k = _np.argmax(self._Phi@self._w) self._w = self._w - self._Phi[k] + self.p self._n += 1 return self._X[k] raise StopIteration