{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "c19a2efe", "metadata": {}, "outputs": [], "source": [ "import sys\n", "import random \n", "import time\n", "import math\n", "import random\n", "import numpy as np\n", "from scipy import stats\n", "from random import randint\n", "from util import *\n", "from stats import Histogram\n", "\n", "def randomFloat(low, high):\n", " \"\"\"\n", " sample float within range\n", " Parameters\n", " low : low valuee\n", " high : high valuee\n", " \"\"\"\n", " return random.random() * (high-low) + low\n", "\n", "def randomInt(minv, maxv):\n", " \"\"\"\n", " sample int within range\n", " Parameters\n", " minv : low valuee\n", " maxv : high valuee\n", " \"\"\"\n", " return randint(minv, maxv)\n", "\n", "def randIndex(lData):\n", " \"\"\"\n", " random index of a list\n", " Parameters\n", " lData : list data\n", " \"\"\"\n", " return randint(0, len(lData)-1)\n", "\n", "def randomUniformSampled(low, high):\n", " \"\"\"\n", " sample float within range\n", "\n", " Parameters\n", " low : low value\n", " high : high value\n", " \"\"\"\n", " return np.random.uniform(low, high)\n", "\n", "def randomUniformSampledList(low, high, size):\n", " \"\"\"\n", " sample floats within range to create list\n", " Parameters\n", " low : low value\n", " high : high value\n", " size ; size of list to be returned\n", " \"\"\"\n", " return np.random.uniform(low, high, size)\n", "\n", "def randomNormSampled(mean, sd):\n", " \"\"\"\n", " sample float from normal\n", " Parameters\n", " mean : mean\n", " sd : std deviation\n", " \"\"\"\n", " return np.random.normal(mean, sd)\n", "\n", "def randomNormSampledList(mean, sd, size):\n", " \"\"\"\n", " sample float list from normal \n", " Parameters\n", " mean : mean\n", " sd : std deviation\n", " size : size of list to be returned\n", " \"\"\"\n", " return np.random.normal(mean, sd, size)\n", "\n", "def randomSampledList(sampler, size):\n", " \"\"\"\n", " sample list from given sampler \n", " Parameters\n", " sampler : sampler object\n", " size : size of list to be returned\n", " \"\"\"\n", " return list(map(lambda i : sampler.sample(), range(size)))\n", "\n", "\n", "def minLimit(val, minv):\n", " \"\"\"\n", " min limit\n", "\n", " Parameters\n", " val : value\n", " minv : min limit\n", " \"\"\"\n", " if (val < minv):\n", " val = minv\n", " return val\n", "\n", "\n", "def rangeLimit(val, minv, maxv):\n", " \"\"\"\n", " range limit\n", " Parameters\n", " val : value\n", " minv : min limit\n", " maxv : max limit\n", " \"\"\"\n", " if (val < minv):\n", " val = minv\n", " elif (val > maxv):\n", " val = maxv\n", " return val\n", "\n", "\n", "def sampleUniform(minv, maxv):\n", " \"\"\"\n", " sample int within range\n", " Parameters\n", " minv ; int min limit\n", " maxv : int max limit\n", " \"\"\"\n", " return randint(minv, maxv)\n", "\n", "\n", "def sampleFromBase(value, dev):\n", " \"\"\"\n", " sample int wrt base\n", " Parameters\n", " value : base value\n", " dev : deviation\n", " \"\"\"\n", " return randint(value - dev, value + dev)\n", "\n", "\n", "def sampleFloatFromBase(value, dev):\n", " \"\"\"\n", " sample float wrt base\n", " Parameters\n", " value : base value\n", " dev : deviation\n", " \"\"\"\n", " return randomFloat(value - dev, value + dev)\n", "\n", "\n", "def distrUniformWithRanndom(total, numItems, noiseLevel):\n", " \"\"\"\n", " uniformly distribute with some randomness and preserves total\n", " Parameters\n", " total : total count\n", " numItems : no of bins\n", " noiseLevel : noise level fraction\n", " \"\"\"\n", " perItem = total / numItems\n", " var = perItem * noiseLevel\n", " items = []\n", " for i in range(numItems):\n", " item = perItem + randomFloat(-var, var)\n", " items.append(item)\t\n", "\n", " #adjust last item\n", " sm = sum(items[:-1])\n", " items[-1] = total - sm\n", " return items\n", "\n", "\n", "def isEventSampled(threshold, maxv=100):\n", " \"\"\"\n", " sample event which occurs if sampled below threshold\n", " Parameters\n", " threshold : threshold for sampling\n", " maxv : maximum values\n", " \"\"\"\n", " return randint(0, maxv) < threshold\n", "\n", "\n", "def sampleBinaryEvents(events, probPercent):\n", " \"\"\"\n", " sample binary events\n", " Parameters\n", " events : two events\n", " probPercent : probability as percentage\n", " \"\"\"\n", " if (randint(0, 100) < probPercent):\n", " event = events[0]\n", " else:\n", " event = events[1]\n", " return event\n", "\n", "\n", "def addNoiseNum(value, sampler):\n", " \"\"\"\n", " add noise to numeric value\n", " Parameters\n", " value : base value\n", " sampler : sampler for noise\n", " \"\"\"\n", " return value * (1 + sampler.sample())\n", "\n", "\n", "def addNoiseCat(value, values, noise):\t\n", " \"\"\"\n", " add noise to categorical value i.e with some probability change value\n", " Parameters\n", " value : cat value\n", " values : cat values\n", " noise : noise level fraction\n", " \"\"\"\n", " newValue = value\n", " threshold = int(noise * 100)\n", " if (isEventSampled(threshold)):\t\t\n", " newValue = selectRandomFromList(values)\n", " while newValue == value:\n", " newValue = selectRandomFromList(values)\n", " return newValue\n", "\n", "\n", "def sampleWithReplace(data, sampSize):\n", " \"\"\"\n", " sample with replacement\n", " Parameters\n", " data : array\n", " sampSize : sample size\n", " \"\"\"\n", " sampled = list()\n", " le = len(data)\n", " if sampSize is None:\n", " sampSize = le\n", " for i in range(sampSize):\n", " j = random.randint(0, le - 1)\n", " sampled.append(data[j])\n", " return sampled\n", "\n", "class CumDistr:\n", " \"\"\"\n", " cumulative distr\n", " \"\"\"\n", "\n", " def __init__(self, data, numBins = None):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " data : array\n", " numBins : no of bins\n", " \"\"\"\n", " if not numBins:\n", " numBins = int(len(data) / 5)\n", " res = stats.cumfreq(data, numbins=numBins)\n", " self.cdistr = res.cumcount / len(data)\n", " self.loLim = res.lowerlimit\n", " self.upLim = res.lowerlimit + res.binsize * res.cumcount.size\n", " self.binWidth = res.binsize\n", "\n", " def getDistr(self, value):\n", " \"\"\"\n", " get cumulative distribution\n", "\n", " Parameters\n", " value : value\n", " \"\"\"\n", " if value <= self.loLim:\n", " d = 0.0\n", " elif value >= self.upLim:\n", " d = 1.0\n", " else:\n", " bin = int((value - self.loLim) / self.binWidth)\n", " d = self.cdistr[bin]\n", " return d\n", "\n", "class BernoulliTrialSampler:\n", " \"\"\"\n", " bernoulli trial sampler return True or False\n", " \"\"\"\n", "\n", " def __init__(self, pr):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " pr : probability\n", " \"\"\"\n", " self.pr = pr\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return random.random() < self.pr\n", "\n", "class PoissonSampler:\n", " \"\"\"\n", " poisson sampler returns number of events\n", " \"\"\"\n", " def __init__(self, rateOccur, maxSamp):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " rateOccur : rate of occurence\n", " maxSamp : max limit on no of samples\n", " \"\"\"\n", " self.rateOccur = rateOccur\n", " self.maxSamp = int(maxSamp)\n", " self.pmax = self.calculatePr(rateOccur)\n", "\n", " def calculatePr(self, numOccur):\n", " \"\"\"\n", " calulates probability\n", "\n", " Parameters\n", " numOccur : no of occurence\n", " \"\"\"\n", " p = (self.rateOccur ** numOccur) * math.exp(-self.rateOccur) / math.factorial(numOccur)\n", " return p\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = 0\n", " while not done:\n", " no = randint(0, self.maxSamp)\n", " sp = randomFloat(0.0, self.pmax)\n", " ap = self.calculatePr(no)\n", " if sp < ap:\n", " done = True\n", " samp = no\n", " return samp\n", "\n", "class ExponentialSampler:\n", " \"\"\"\n", " returns interval between events\n", " \"\"\"\n", " def __init__(self, rateOccur, maxSamp = None):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " rateOccur : rate of occurence\n", " maxSamp : max limit on interval\n", " \"\"\"\n", " self.interval = 1.0 / rateOccur\n", " self.maxSamp = int(maxSamp) if maxSamp is not None else None\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " sampled = np.random.exponential(scale=self.interval)\n", " if self.maxSamp is not None:\n", " while sampled > self.maxSamp:\n", " sampled = np.random.exponential(scale=self.interval)\n", " return sampled\n", "\n", "class UniformNumericSampler:\n", " \"\"\"\n", " uniform sampler for numerical values\n", " \"\"\"\n", " def __init__(self, minv, maxv):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " minv : min value\n", " maxv : max value\n", " \"\"\"\n", " self.minv = minv\n", " self.maxv = maxv\n", "\n", " def isNumeric(self):\n", " \"\"\"\n", " returns true\n", " \"\"\"\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " samp =\tsampleUniform(self.minv, self.maxv) if isinstance(self.minv, int) else randomFloat(self.minv, self.maxv)\n", " return samp\t\n", "\n", "class UniformCategoricalSampler:\n", " \"\"\"\n", " uniform sampler for categorical values\n", " \"\"\"\n", " def __init__(self, cvalues):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " cvalues : categorical value list\n", " \"\"\"\n", " self.cvalues = cvalues\n", "\n", " def isNumeric(self):\n", " return False\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return selectRandomFromList(self.cvalues)\t\n", "\n", "class NormalSampler:\n", " \"\"\"\n", " normal sampler\n", " \"\"\"\n", " def __init__(self, mean, stdDev):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mean : mean\n", " stdDev : std deviation\n", " \"\"\"\n", " self.mean = mean\n", " self.stdDev = stdDev\n", " self.sampleAsInt = False\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sampleAsIntValue(self):\n", " \"\"\"\n", " set True to sample as int\n", " \"\"\"\n", " self.sampleAsInt = True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " samp = np.random.normal(self.mean, self.stdDev)\n", " if self.sampleAsInt:\n", " samp = int(samp)\n", " return samp\n", "\n", "class LogNormalSampler:\n", " \"\"\"\n", " log normal sampler\n", " \"\"\"\n", " def __init__(self, mean, stdDev):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mean : mean\n", " stdDev : std deviation\n", " \"\"\"\n", " self.mean = mean\n", " self.stdDev = stdDev\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return np.random.lognormal(self.mean, self.stdDev)\n", "\n", "class NormalSamplerWithTrendCycle:\n", " \"\"\"\n", " normal sampler with cycle and trend\n", " \"\"\"\n", " def __init__(self, mean, stdDev, dmean, cycle, step=1):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mean : mean\n", " stdDev : std deviation\n", " dmean : trend delta\n", " cycle : cycle values wrt base mean\n", " step : adjustment step for cycle and trend\n", " \"\"\"\n", " self.mean = mean\n", " self.cmean = mean\n", " self.stdDev = stdDev\n", " self.dmean = dmean\n", " self.cycle = cycle\n", " self.clen = len(cycle) if cycle is not None else 0\n", " self.step = step\n", " self.count = 0\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " s = np.random.normal(self.cmean, self.stdDev)\n", " self.count += 1\n", " if self.count % self.step == 0:\n", " cy = 0\n", " if self.clen > 1:\n", " coff = self.count % self.clen\n", " cy = self.cycle[coff]\n", " tr = self.count * self.dmean\n", " self.cmean = self.mean + tr + cy\n", " return s\n", "\n", "\n", "class ParetoSampler:\n", " \"\"\"\n", " pareto sampler\n", " \"\"\"\n", " def __init__(self, mode, shape):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mode : mode\n", " shape : shape\n", " \"\"\"\n", " self.mode = mode\n", " self.shape = shape\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return (np.random.pareto(self.shape) + 1) * self.mode\n", "\n", "class GammaSampler:\n", " \"\"\"\n", " pareto sampler\n", " \"\"\"\n", " def __init__(self, shape, scale):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " shape : shape\n", " scale : scale\n", " \"\"\"\n", " self.shape = shape\n", " self.scale = scale\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return np.random.gamma(self.shape, self.scale)\n", "\n", "class GaussianRejectSampler:\n", " \"\"\"\n", " gaussian sampling based on rejection sampling\n", " \"\"\"\n", " def __init__(self, mean, stdDev):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mean : mean\n", " stdDev : std deviation\n", " \"\"\"\n", " self.mean = mean\n", " self.stdDev = stdDev\n", " self.xmin = mean - 3 * stdDev\n", " self.xmax = mean + 3 * stdDev\n", " self.ymin = 0.0\n", " self.fmax = 1.0 / (math.sqrt(2.0 * 3.14) * stdDev)\n", " self.ymax = 1.05 * self.fmax\n", " self.sampleAsInt = False\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sampleAsIntValue(self):\n", " \"\"\"\n", " sample as int value\n", " \"\"\"\n", " self.sampleAsInt = True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = 0\n", " while not done:\n", " x = randomFloat(self.xmin, self.xmax)\n", " y = randomFloat(self.ymin, self.ymax)\n", " f = self.fmax * math.exp(-(x - self.mean) * (x - self.mean) / (2.0 * self.stdDev * self.stdDev))\n", " if (y < f):\n", " done = True\n", " samp = x\n", " if self.sampleAsInt:\n", " samp = int(samp)\n", " return samp\n", "\n", "class DiscreteRejectSampler:\n", " \"\"\"\n", " non parametric sampling for discrete values using given distribution based \n", " on rejection sampling\t\n", " \"\"\"\n", " def __init__(self, xmin, xmax, step, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " xmin : min value\n", " xmax : max value\n", " step : discrete step\n", " values : distr values\n", " \"\"\"\n", " self.xmin = xmin\n", " self.xmax = xmax\n", " self.step = step\n", " self.distr = values\n", " if (len(self.distr) == 1):\n", " self.distr = self.distr[0]\t\n", " numSteps = int((self.xmax - self.xmin) / self.step)\n", " #print(\"{:.3f} {:.3f} {:.3f} {}\".format(self.xmin, self.xmax, self.step, numSteps))\n", " assert len(self.distr)\t== numSteps + 1, \"invalid number of distr values expected {}\".format(numSteps + 1)\n", " self.ximin = 0\n", " self.ximax = numSteps\n", " self.pmax = float(max(self.distr))\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = None\n", " while not done:\n", " xi = randint(self.ximin, self.ximax)\n", " #print(formatAny(xi, \"xi\"))\n", " ps = randomFloat(0.0, self.pmax)\n", " pa = self.distr[xi]\n", " if ps < pa:\n", " samp = self.xmin + xi * self.step\n", " done = True\n", " return samp\n", "\n", "\n", "class TriangularRejectSampler:\n", " \"\"\"\n", " non parametric sampling using triangular distribution based on rejection sampling\t\n", " \"\"\"\n", " def __init__(self, xmin, xmax, vertexValue, vertexPos=None):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " xmin : min value\n", " xmax : max value\n", " vertexValue : distr value at vertex\n", " vertexPos : vertex pposition\n", " \"\"\"\n", " self.xmin = xmin\n", " self.xmax = xmax\n", " self.vertexValue = vertexValue\n", " if vertexPos: \n", " assert vertexPos > xmin and vertexPos < xmax, \"vertex position outside bound\"\n", " self.vertexPos = vertexPos\n", " else:\n", " self.vertexPos = 0.5 * (xmin + xmax)\n", " self.s1 = vertexValue / (self.vertexPos - xmin)\n", " self.s2 = vertexValue / (xmax - self.vertexPos)\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = None\n", " while not done:\n", " x = randomFloat(self.xmin, self.xmax)\n", " y = randomFloat(0.0, self.vertexValue)\n", " f = (x - self.xmin) * self.s1 if x < self.vertexPos else (self.xmax - x) * self.s2\n", " if (y < f):\n", " done = True\n", " samp = x\n", "\n", " return samp;\t\n", "\n", "class NonParamRejectSampler:\n", " \"\"\"\n", " non parametric sampling using given distribution based on rejection sampling\t\n", " \"\"\"\n", " def __init__(self, xmin, binWidth, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " xmin : min value\n", " binWidth : bin width\n", " values : distr values\n", " \"\"\"\n", " self.values = values\n", " if (len(self.values) == 1):\n", " self.values = self.values[0]\n", " self.xmin = xmin\n", " self.xmax = xmin + binWidth * (len(self.values) - 1)\n", " #print(self.xmin, self.xmax, binWidth)\n", " self.binWidth = binWidth\n", " self.fmax = 0\n", " for v in self.values:\n", " if (v > self.fmax):\n", " self.fmax = v\n", " self.ymin = 0\n", " self.ymax = self.fmax\n", " self.sampleAsInt = True\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sampleAsFloat(self):\n", " self.sampleAsInt = False\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = 0\n", " while not done:\n", " if self.sampleAsInt:\n", " x = random.randint(self.xmin, self.xmax)\n", " y = random.randint(self.ymin, self.ymax)\n", " else:\n", " x = randomFloat(self.xmin, self.xmax)\n", " y = randomFloat(self.ymin, self.ymax)\n", " bin = int((x - self.xmin) / self.binWidth)\n", " f = self.values[bin]\n", " if (y < f):\n", " done = True\n", " samp = x\n", " return samp\n", "\n", "class JointNonParamRejectSampler:\n", " \"\"\"\n", " non parametric sampling using given distribution based on rejection sampling\t\n", " \"\"\"\n", " def __init__(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " xmin : min value for x\n", " xbinWidth : bin width for x\n", " xnbin : no of bins for x\n", " ymin : min value for y\n", " ybinWidth : bin width for y\n", " ynbin : no of bins for y\n", " values : distr values\n", " \"\"\"\n", " self.values = values\n", " if (len(self.values) == 1):\n", " self.values = self.values[0]\n", " assert len(self.values) == xnbin * ynbin, \"wrong number of values for joint distr\"\n", " self.xmin = xmin\n", " self.xmax = xmin + xbinWidth * xnbin\n", " self.xbinWidth = xbinWidth\n", " self.ymin = ymin\n", " self.ymax = ymin + ybinWidth * ynbin\n", " self.ybinWidth = ybinWidth\n", " self.pmax = max(self.values)\n", " self.values = np.array(self.values).reshape(xnbin, ynbin)\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = 0\n", " while not done:\n", " x = randomFloat(self.xmin, self.xmax)\n", " y = randomFloat(self.ymin, self.ymax)\n", " xbin = int((x - self.xmin) / self.xbinWidth)\n", " ybin = int((y - self.ymin) / self.ybinWidth)\n", " ap = self.values[xbin][ybin]\n", " sp = randomFloat(0.0, self.pmax)\n", " if (sp < ap):\n", " done = True\n", " samp = [x,y]\n", " return samp\n", "\n", "\n", "class JointNormalSampler:\n", " \"\"\"\n", " joint normal sampler\t\n", " \"\"\"\n", " def __init__(self, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " values : 2 mean values followed by 4 values for covar matrix\n", " \"\"\"\n", " lvalues = list(values)\n", " assert len(lvalues) == 6, \"incorrect number of arguments for joint normal sampler\"\n", " mean = lvalues[:2]\n", " self.mean = np.array(mean)\n", " sd = lvalues[2:]\n", " self.sd = np.array(sd).reshape(2,2)\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return list(np.random.multivariate_normal(self.mean, self.sd))\n", "\n", "\n", "class MultiVarNormalSampler:\n", " \"\"\"\n", " muti variate normal sampler\t\n", " \"\"\"\n", " def __init__(self, numVar, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " numVar : no of variables\n", " values : numVar mean values followed by numVar x numVar values for covar matrix\n", " \"\"\"\n", " lvalues = list(values)\n", " assert len(lvalues) == numVar + numVar * numVar, \"incorrect number of arguments for multi var normal sampler\"\n", " mean = lvalues[:numVar]\n", " self.mean = np.array(mean)\n", " sd = lvalues[numVar:]\n", " self.sd = np.array(sd).reshape(numVar,numVar)\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " return list(np.random.multivariate_normal(self.mean, self.sd))\n", "\n", "class CategoricalRejectSampler:\n", " \"\"\"\n", " non parametric sampling for categorical attributes using given distribution based \n", " on rejection sampling\t\n", " \"\"\"\n", " def __init__(self, *values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " values : list of tuples which contains a categorical value and the corresponsding distr value\n", " \"\"\"\n", " self.distr = values\n", " if (len(self.distr) == 1):\n", " self.distr = self.distr[0]\n", " maxv = 0\n", " for t in self.distr:\n", " if t[1] > maxv:\n", " maxv = t[1]\n", " self.maxv = maxv\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " done = False\n", " samp = \"\"\n", " while not done:\n", " t = self.distr[randint(0, len(self.distr)-1)]\t\n", " d = randomFloat(0, self.maxv)\t\n", " if (d <= t[1]):\n", " done = True\n", " samp = t[0]\n", " return samp\n", "\n", "\n", "class DistrMixtureSampler:\n", " \"\"\"\n", " distr mixture sampler\n", " \"\"\"\n", " def __init__(self, mixtureWtDistr, *compDistr):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " mixtureWtDistr : sampler that returns index into sampler list\n", " compDistr : sampler list\n", " \"\"\"\n", " self.mixtureWtDistr = mixtureWtDistr\n", " self.compDistr = compDistr\n", " if (len(self.compDistr) == 1):\n", " self.compDistr = self.compDistr[0]\n", "\n", " def isNumeric(self):\n", " return True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " comp = self.mixtureWtDistr.sample()\n", "\n", " #sample sampled comp distr\n", " return self.compDistr[comp].sample()\n", "\n", "class AncestralSampler:\n", " \"\"\"\n", " ancestral sampler using conditional distribution\n", " \"\"\"\n", " def __init__(self, parentDistr, childDistr, numChildren):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " parentDistr : parent distr\n", " childDistr : childdren distribution dictionary\n", " numChildren : no of children\n", " \"\"\"\n", " self.parentDistr = parentDistr\n", " self.childDistr = childDistr\n", " self.numChildren = numChildren\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " parent = self.parentDistr.sample()\n", "\n", " #sample all children conditioned on parent\n", " children = []\n", " for i in range(self.numChildren):\n", " key = (parent, i)\n", " child = self.childDistr[key].sample()\n", " children.append(child)\n", " return (parent, children)\n", "\n", "class ClusterSampler:\n", " \"\"\"\n", " sample cluster and then sample member of sampled cluster\n", " \"\"\"\n", " def __init__(self, clusters, *clustDistr):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " clusters : dictionary clusters\n", " clustDistr : distr for clusters\n", " \"\"\"\n", " self.sampler = CategoricalRejectSampler(*clustDistr)\n", " self.clusters = clusters\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " cluster = self.sampler.sample()\n", " member = random.choice(self.clusters[cluster])\n", " return (cluster, member)\n", "\n", "\n", "class MetropolitanSampler:\n", " \"\"\"\n", " metropolitan sampler\t\n", " \"\"\"\n", " def __init__(self, propStdDev, min, binWidth, values):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " propStdDev : proposal distr std dev\n", " min : min domain value for target distr\n", " binWidth : bin width\n", " values : target distr values\n", " \"\"\"\n", " self.targetDistr = Histogram.createInitialized(min, binWidth, values)\n", " self.propsalDistr = GaussianRejectSampler(0, propStdDev)\n", " self.proposalMixture = False\n", "\n", " # bootstrap sample\n", " (minv, maxv) = self.targetDistr.getMinMax()\n", " self.curSample = random.randint(minv, maxv)\n", " self.curDistr = self.targetDistr.value(self.curSample)\n", " self.transCount = 0\n", "\n", " def initialize(self):\n", " \"\"\"\n", " initialize\n", " \"\"\"\n", " (minv, maxv) = self.targetDistr.getMinMax()\n", " self.curSample = random.randint(minv, maxv)\n", " self.curDistr = self.targetDistr.value(self.curSample)\n", " self.transCount = 0\n", "\n", " def setProposalDistr(self, propsalDistr):\n", " \"\"\"\n", " set custom proposal distribution\n", " Parameters\n", " propsalDistr : proposal distribution\n", " \"\"\"\n", " self.propsalDistr = propsalDistr\n", "\n", "\n", " def setGlobalProposalDistr(self, globPropStdDev, proposalChoiceThreshold):\n", " \"\"\"\n", " set custom proposal distribution\n", " Parameters\n", " globPropStdDev : global proposal distr std deviation\n", " proposalChoiceThreshold : threshold for using global proposal distribution\n", " \"\"\"\n", " self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev)\n", " self.proposalChoiceThreshold = proposalChoiceThreshold\n", " self.proposalMixture = True\n", "\n", " def sample(self):\n", " \"\"\"\n", " samples value\n", " \"\"\"\n", " nextSample = self.proposalSample(1)\n", " self.targetSample(nextSample)\n", " return self.curSample;\n", "\n", " def proposalSample(self, skip):\n", " \"\"\"\n", " sample from proposal distribution\n", " Parameters\n", " skip : no of samples to skip\n", " \"\"\"\n", " for i in range(skip):\n", " if not self.proposalMixture:\n", " #one proposal distr\n", " nextSample = self.curSample + self.propsalDistr.sample()\n", " nextSample = self.targetDistr.boundedValue(nextSample)\n", " else:\n", " #mixture of proposal distr\n", " if random.random() < self.proposalChoiceThreshold:\n", " nextSample = self.curSample + self.propsalDistr.sample()\n", " else:\n", " nextSample = self.curSample + self.globalProposalDistr.sample()\n", " nextSample = self.targetDistr.boundedValue(nextSample)\n", "\n", " return nextSample\n", "\n", " def targetSample(self, nextSample):\n", " \"\"\"\n", " target sample\n", " Parameters\n", " nextSample : proposal distr sample\n", " \"\"\"\n", " nextDistr = self.targetDistr.value(nextSample)\n", "\n", " transition = False\n", " if nextDistr > self.curDistr:\n", " transition = True\n", " else:\n", " distrRatio = float(nextDistr) / self.curDistr\n", " if random.random() < distrRatio:\n", " transition = True\n", "\n", " if transition:\n", " self.curSample = nextSample\n", " self.curDistr = nextDistr\n", " self.transCount += 1\n", "\n", "\n", " def subSample(self, skip):\n", " \"\"\"\n", " sub sample\n", " Parameters\n", " skip : no of samples to skip\n", " \"\"\"\n", " nextSample = self.proposalSample(skip)\n", " self.targetSample(nextSample)\n", " return self.curSample;\n", "\n", " def setMixtureProposal(self, globPropStdDev, mixtureThreshold):\n", " \"\"\"\n", " mixture proposal\n", " Parameters\n", " globPropStdDev : global proposal distr std deviation\n", " mixtureThreshold : threshold for using global proposal distribution\n", " \"\"\"\n", " self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev)\n", " self.mixtureThreshold = mixtureThreshold\n", "\n", " def samplePropsal(self):\n", " \"\"\"\n", " sample from proposal distr\n", " \"\"\"\n", " if self.globalPropsalDistr is None:\n", " proposal = self.propsalDistr.sample()\n", " else:\n", " if random.random() < self.mixtureThreshold:\n", " proposal = self.propsalDistr.sample()\n", " else:\n", " proposal = self.globalProposalDistr.sample()\n", "\n", " return proposal\n", "\n", "class PermutationSampler:\n", " \"\"\"\n", " permutation sampler by shuffling a list\n", " \"\"\"\n", " def __init__(self):\n", " \"\"\"\n", " initialize\n", " \"\"\"\n", " self.values = None\n", " self.numShuffles = None\n", "\n", " @staticmethod\n", " def createSamplerWithValues(values, *numShuffles):\n", " \"\"\"\n", " creator with values\n", " Parameters\n", " values : list data\n", " numShuffles : no of shuffles or range of no of shuffles\n", " \"\"\"\n", " sampler = PermutationSampler()\n", " sampler.values = values\n", " sampler.numShuffles = numShuffles\n", " return sampler\n", "\n", " @staticmethod\n", " def createSamplerWithRange(minv, maxv, *numShuffles):\n", " \"\"\"\n", " creator with ramge min and max\n", "\n", " Parameters\n", " minv : min of range\n", " maxv : max of range\n", " numShuffles : no of shuffles or range of no of shuffles\n", " \"\"\"\n", " sampler = PermutationSampler()\n", " sampler.values = list(range(minv, maxv + 1))\n", " sampler.numShuffles = numShuffles\n", " return sampler\n", "\n", " def sample(self):\n", " \"\"\"\n", " sample new permutation\n", " \"\"\"\n", " cloned = self.values.copy()\n", " shuffle(cloned, *self.numShuffles)\n", " return cloned\n", "\n", "class SpikeyDataSampler:\n", " \"\"\"\n", " samples spikey data\n", " \"\"\"\n", " def __init__(self, intvMean, intvScale, distr, spikeValueMean, spikeValueStd, spikeMaxDuration, baseValue = 0):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " intvMean : interval mean\n", " intvScale : interval std dev\n", " distr : type of distr for interval\n", " spikeValueMean : spike value mean\n", " spikeValueStd : spike value std dev\n", " spikeMaxDuration : max duration for spike\n", " baseValue : base or offset value\n", " \"\"\"\n", " if distr == \"norm\":\n", " self.intvSampler = NormalSampler(intvMean, intvScale)\n", " elif distr == \"expo\":\n", " rate = 1.0 / intvScale\n", " self.intvSampler = ExponentialSampler(rate)\n", " else:\n", " raise ValueError(\"invalid distribution\")\n", "\n", " self.spikeSampler = NormalSampler(spikeValueMean, spikeValueStd)\n", " self.spikeMaxDuration = spikeMaxDuration\n", " self.baseValue = baseValue\n", " self.inSpike = False\n", " self.spikeCount = 0\n", " self.baseCount = 0\n", " self.baseLength = int(self.intvSampler.sample())\n", " self.spikeValues = list()\n", " self.spikeLength = None\n", "\n", " def sample(self):\n", " \"\"\"\n", " sample new value\n", " \"\"\"\n", " if self.baseCount <= self.baseLength:\n", " sampled = self.baseValue\n", " self.baseCount += 1\n", " else:\n", " if not self.inSpike:\n", " #starting spike\n", " spikeVal = self.spikeSampler.sample()\n", " self.spikeLength = sampleUniform(1, self.spikeMaxDuration)\n", " spikeMaxPos = 0 if self.spikeLength == 1 else sampleUniform(0, self.spikeLength-1)\n", " self.spikeValues.clear()\n", " for i in range(self.spikeLength):\n", " if i < spikeMaxPos:\n", " frac = (i + 1) / (spikeMaxPos + 1)\n", " frac = sampleFloatFromBase(frac, 0.1 * frac)\n", " elif i > spikeMaxPos:\n", " frac = (self.spikeLength - i) / (self.spikeLength - spikeMaxPos)\n", " frac = sampleFloatFromBase(frac, 0.1 * frac)\n", " else:\n", " frac = 1.0\n", " self.spikeValues.append(frac * spikeVal)\n", " self.inSpike = True\n", " self.spikeCount = 0\n", "\n", "\n", " sampled = self.spikeValues[self.spikeCount]\n", " self.spikeCount += 1\n", "\n", " if self.spikeCount == self.spikeLength:\n", " #ending spike\n", " self.baseCount = 0\n", " self.baseLength = int(self.intvSampler.sample())\n", " self.inSpike = False\n", "\n", " return sampled\n", "\n", "\n", "class EventSampler:\n", " \"\"\"\n", " sample event\n", " \"\"\"\n", " def __init__(self, intvSampler, valSampler=None):\n", " \"\"\"\n", " initializer\n", "\n", " Parameters\n", " intvSampler : interval sampler\n", " valSampler : value sampler\n", " \"\"\"\n", " self.intvSampler = intvSampler\n", " self.valSampler = valSampler\n", " self.trigger = int(self.intvSampler.sample())\n", " self.count = 0\n", "\n", " def reset(self):\n", " \"\"\"\n", " reset trigger\n", " \"\"\"\n", " self.trigger = int(self.intvSampler.sample())\n", " self.count = 0\n", "\n", " def sample(self):\n", " \"\"\"\n", " sample event\n", " \"\"\"\n", " if self.count == self.trigger:\n", " sampled = self.valSampler.sample() if self.valSampler is not None else 1.0\n", " self.trigger = int(self.intvSampler.sample())\n", " self.count = 0\n", " else:\n", " sample = 0.0\n", " self.count += 1\n", " return sampled\n", "\n", "\n", "\n", "\n", "def createSampler(data):\n", " \"\"\"\n", " create sampler\n", "\n", " Parameters\n", " data : sampler description\n", " \"\"\"\n", " #print(data)\n", " items = data.split(\":\")\n", " size = len(items)\n", " dtype = items[-1]\n", " stype = items[-2]\n", " sampler = None\n", " if stype == \"uniform\":\n", " if dtype == \"int\":\n", " min = int(items[0])\n", " max = int(items[1])\n", " sampler = UniformNumericSampler(min, max)\n", " elif dtype == \"float\":\n", " min = float(items[0])\n", " max = float(items[1])\n", " sampler = UniformNumericSampler(min, max)\n", " elif dtype == \"categorical\":\n", " values = items[:-2]\n", " sampler = UniformCategoricalSampler(values)\n", " elif stype == \"normal\":\n", " mean = float(items[0])\n", " sd = float(items[1])\n", " sampler = NormalSampler(mean, sd)\n", " if dtype == \"int\":\n", " sampler.sampleAsIntValue()\n", " elif stype == \"nonparam\":\n", " if dtype == \"int\" or dtype == \"float\":\n", " min = int(items[0])\n", " binWidth = int(items[1])\n", " values = items[2:-2]\n", " values = list(map(lambda v: int(v), values))\n", " sampler = NonParamRejectSampler(min, binWidth, values)\n", " if dtype == \"float\":\n", " sampler.sampleAsFloat()\n", " elif dtype == \"categorical\":\n", " values = list()\n", " for i in range(0, size-2, 2):\n", " cval = items[i]\n", " dist = int(items[i+1])\n", " pair = (cval, dist)\n", " values.append(pair)\n", " sampler = CategoricalRejectSampler(values)\n", " elif stype == \"discrete\":\n", " vmin = int(items[0])\n", " vmax = int(items[1])\n", " step = int(items[2])\n", " values = list(map(lambda i : int(items[i]), range(3, len(items)-2)))\n", " sampler = DiscreteRejectSampler(vmin, vmax, step, values)\n", " else:\n", " raise ValueError(\"invalid sampler type \" + dtype)\n", " return sampler\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 5 }