Source code for ruckus.sampling
import numpy as _np
[docs]class KernelHerd():
r"""
Iterator which produces the kernel herd [1] from an RKHS and a distribution embedded in it.
**Caution:** this method only works well if the embeddings :math:`\phi(x)` have little overlap with one another!
Given a distribution embedding:
.. math::
\mu_p = \int \phi(x) p(x) dx \ ,
a deterministic chaotic sequence :math:`(x_t)` may be generated by the following
algorithm, beginning with a vector :math:`w_0\in H`:
.. math::
x_{t} = \underset{x}{\mathrm{argmax}}\left<\phi(x),w_t\right> \\
w_{t+1} = w_t + \mu_p - \phi(x_t)
Over time, the kernel embedding of the sequence
.. math::
\hat{\mu}_T = \frac{1}{T}\sum_0^{T-1} \phi(x_t)
converges to :math:`\mu_p` in :math:`O(T^{-1})`.
1. `Chen, Y., Welling, M., Smola, A. "Super-Samples from Kernel Herding." Proceedings of the Twenty-Sixth Conference on Uncertainty in Artificial Intelligence (UAI2010) <https://arxiv.org/abs/1203.3472>`_
==========
Parameters
==========
:param p: The embedded vector :math:`\mu_p`.
:type p: :py:class:`numpy.ndarray`
:param rkhs: The fitted RKHS in which :math:`\mu_p` is embedded.
:type rkhs: :py:class:`RKHS`
:param size: Number of samples to be generated.
:type size: int
:param w_init: The initial vector for the herding algorithm. If ``None``, this is set to ``p``.
:type w_init: :py:class:`numpy.ndarray`
:param domain: The range of values to be sampled from. If ``None``, this is set to ``rkhs.X_fit_``.
:type domain: :py:class:`numpy.ndarray`
"""
def __init__(self,p,rkhs,size,w_init=None,domain=None,):
self.p = p
self.rkhs = rkhs
self.size = size
if domain is None:
self._X = rkhs.X_fit_
else:
self._X = domain
self._Phi = rkhs.transform(self._X)
if w_init is None:
self._w = p
else:
self._w = w_init
self._n = 0
def __iter__(self):
return self
def __next__(self):
return self._herd_step()
def _herd_step(self):
if self._n < self.size:
#k = _np.argmax(_np.tensordot(self._Phi,self._w,axes=(tuple(range(1,len(self.rkhs.shape_out_)+1,)),
# tuple(range(len(self.rkhs.shape_out_))))))
k = _np.argmax(self._Phi@self._w)
self._w = self._w - self._Phi[k] + self.p
self._n += 1
return self._X[k]
raise StopIteration