The Art of R Programming

(WallPaper) #1

22 # set up the return matrix
23 sumcounts <- matrix(rep(0,ngrps*(spacedim+1)),nrow=ngrps)
24 for (i in 1:nrow(mchunk)) {
25 dsts <- dst(mchunk[i,],t(currctrs))
26 j <- which.min(dsts)
27 sumcounts[j,] <- sumcounts[j,] + c(mchunk[i,],1)
28 }
29 sumcounts
30 }
31
32 parkm <- function(cls,m,niters,initcenters) {
33 n <- nrow(m)
34 spacedim <- ncol(m) # what dimension space are we in?
35 # determine which worker gets which chunk of rows of m
36 options(warn=-1)
37 ichunks <- split(1:n,1:length(cls))
38 options(warn=0)
39 # form row chunks
40 mchunks <- lapply(ichunks,function(ichunk) m[ichunk,])
41 mcf <- function(mchunk) mchunk <<- mchunk
42 # send row chunks to workers; each chunk will be a global variable at
43 # the worker, named mchunk
44 invisible(clusterApply(cls,mchunks,mcf))
45 # send dst() to workers
46 clusterExport(cls,"dst")
47 # start iterations
48 centers <- initcenters
49 for (i in 1:niters) {
50 sumcounts <- clusterCall(cls,findnewgrps,centers)
51 tmp <- Reduce("+",sumcounts)
52 centers <- tmp[,1:spacedim] / tmp[,spacedim+1]
53 # if a group is empty, let's set its center to 0s
54 centers[is.nan(centers)] <- 0
55 }
56 centers
57 }


The code here is largely similar to our earlier mutual outlinks example.
However, there are a couple of newsnowcalls and a different kind of usage
of an old call.
Let’s start with lines 39 through 44. Since our matrixmdoes not change
from one iteration to the next, we definitely do not want to resend it to the
workers repeatedly, exacerbating the overhead problem. Thus, first we need
to send each worker its assigned chunk ofm, just once. This is done in line 44
viasnow’sclusterApply()function, which we used earlier but need to get cre-
ative with here. In line 41, we define the functionmcf(), which will, running

Parallel R 339
Free download pdf