Source code for pymanopt.solvers.particle_swarm

import time

import numpy as np
import numpy.random as rnd

from pymanopt.solvers.solver import Solver

[docs]class ParticleSwarm(Solver): """ Particle swarm optimization method based on pso.m from the manopt MATLAB package. """ def __init__(self, maxcostevals=None, maxiter=None, populationsize=None, nostalgia=1.4, social=1.4, *args, **kwargs): """ Instantiate Particle Swarm Optimization (PSO) solver class. Variable attributes (defaults in brackets): - maxcostevals (max(5000, 2 * dim)) Maximum number of allowed cost evaluations - maxiter (max(500, 4 * dim)) Maximum number of allowed iterations - populationsize (min(40, 10 * dim)) Size of the considered swarm population - nostalgia (1.4) Quantifies performance relative to past performances - social (1.4) Quantifies performance relative to neighbors """ super().__init__(*args, **kwargs) self._maxcostevals = maxcostevals self._maxiter = maxiter self._populationsize = populationsize self._nostalgia = nostalgia self._social = social
[docs] def solve(self, problem, x=None): """ Perform optimization using the particle swarm optimization algorithm. Arguments: - problem Pymanopt problem setup using the Problem class, this must have a .manifold attribute specifying the manifold to optimize over, as well as a cost (specified using a theano graph or as a python function). - x=None Optional parameter. Initial population of elements on the manifold. If None then an initial population will be randomly generated Returns: - x Local minimum of obj, or if algorithm terminated before convergence x will be the point at which it terminated """ man = problem.manifold verbosity = problem.verbosity objective = problem.cost # Choose proper default algorithm parameters. We need to know about the # dimension of the manifold to limit the parameter range, so we have to # defer proper initialization until this point. dim = man.dim if self._maxcostevals is None: self._maxcostevals = max(5000, 2 * dim) if self._maxiter is None: self._maxiter = max(500, 4 * dim) if self._populationsize is None: self._populationsize = min(40, 10 * dim) # If no initial population x is given by the user, generate one at # random. if x is None: x = [man.rand() for i in range(int(self._populationsize))] elif not hasattr(x, "__iter__"): raise ValueError("The initial population x must be iterable") else: if len(x) != self._populationsize: print("The population size was forced to the size of " "the given initial population") self._populationsize = len(x) # Initialize personal best positions to the initial population. y = list(x) # Save a copy of the swarm at the previous iteration. xprev = list(x) # Initialize velocities for each particle. v = [man.randvec(xi) for xi in x] # Compute cost for each particle xi. costs = np.array([objective(xi) for xi in x]) fy = list(costs) costevals = self._populationsize # Identify the best particle and store its cost/position. imin = costs.argmin() fbest = costs[imin] xbest = x[imin] # Iteration counter (at any point, iter is the number of fully executed # iterations so far). iter = 0 time0 = time.time() self._start_optlog() while True: iter += 1 if verbosity >= 2: print("Cost evals: %7d\tBest cost: %+.8e" % (costevals, fbest)) # Stop if any particle triggers a stopping criterion. for i, xi in enumerate(x): stop_reason = self._check_stopping_criterion( time0, iter=iter, costevals=costevals) if stop_reason is not None: break if stop_reason: if verbosity >= 1: print(stop_reason) print('') break # Compute the inertia factor which we linearly decrease from 0.9 to # 0.4 from iter = 0 to iter = maxiter. w = 0.4 + 0.5 * (1 - iter / self._maxiter) # Compute the velocities. for i, xi in enumerate(x): # Get the position and past best position of particle i. yi = y[i] # Get the previous position and velocity of particle i. xiprev = xprev[i] vi = v[i] # Compute the new velocity of particle i, composed of three # contributions. inertia = w * man.transp(xiprev, xi, vi) nostalgia = rnd.rand() * self._nostalgia * man.log(xi, yi) social = rnd.rand() * self._social * man.log(xi, xbest) v[i] = inertia + nostalgia + social # Backup the current swarm positions. xprev = list(x) # Update positions, personal bests and global best. for i, xi in enumerate(x): # Compute new position of particle i. x[i] = man.retr(xi, v[i]) # Compute new cost of particle i. fxi = objective(xi) # Update costs of the swarm. costs[i] = fxi # Update self-best if necessary. if fxi < fy[i]: fy[i] = fxi y[i] = xi # Update global best if necessary. if fy[i] < fbest: fbest = fy[i] xbest = xi costevals += self._populationsize if self._logverbosity <= 0: return xbest else: self._stop_optlog(xbest, fbest, stop_reason, time0, costevals=costevals, iter=iter) return xbest, self._optlog