#######################################

##### How to use "papply" library #####

#######################################



###############

## Example 1 ##

############### 



# Load the papply package. Note, you do not need to load Rmpi-

# papply tries this automatically.

library(papply)



# compute a list of sums over 1:i for 1<=i <=100.

args <- list()

for (i in 1:100){

  args[[i]] <- 1:i

 }



papply(args,sum)





################

## Example 2  ##

################



# Express "hello your_first_name your_last_name" 

# Provides an example of using the papply_commondata argument.



# Load the library

library(papply)



env <- list(greeting="Hello")

names <- list()

names[[1]] <- list(first="Sunny", last="Wang")

names[[2]] <- list(first="Mu", last="Zhu")

greet <- function(arg){

  return(paste(greeting,' ',arg$first, ' ', arg$last))

  }

papply(names, greet, env)





#######################

##### Bootstrap #######

#######################



# Bootstrap example:  

# Fisher's famous Iris flower data set is used, 

# where only the species virginica and versicolor are considered. 

# The species can be modeled by logistic regression as a function of sepal length 

# (the other variables available are ignored). 

# Fitting the logistic regression model by maximum likelihood 

# gives the parameter estimates and their standard errors.



# It is known that maximum likelihood estimators are asymptotically 

# normally distributed. 

# We can check this assumption using a bootstrap procedure as follows:

# (1) Sample n observations with replacement from the original data, 

# where n is the number of observations. 

# (2) Fit the logistic regression model by maximum likelihood. 

# (3) Repeat this bootstrap sampling many times (B rounds). 

# (4) Use the sampling distribution of the estimates thus computed 

# to be an approximation to the 'true' population sampling distribution. 



# Load the library

library(papply)



# Load the data

data(iris)



# Extract the part of the data containing 

# two species "virginica" and "versicolor" and 

# an imput variable "sepal length".

x<-iris[which(iris[,5]!="setosa"),c(3,5)]



# The number of bootstrap samples

trials <- 10000;



# Creat a list of arguments 

arguments <- list()

for (i in 1:trials) {

    arguments[[i]] <- i

 }



boot <- function(arg){



# print the index of trials 

print(arg)



# creat an index for bootstrap samples

ind <- sample(seq(100),100,replace=TRUE)



# fit the logistic regression model by maximum likelihood 

result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))



return(coefficients(result1)[1], coefficients(result1)[2])

}



result <- papply(arguments, boot)



# extract the estimates of intercept and slop from the result

intercept1 <- as.numeric(lapply(result,function(arg){return(arg[[1]])})) 

slope1 <- as.numeric(lapply(result,function(arg){return(arg[[2]])}))



# draw the histograms for the estimates

par(mfrow=c(1,2))

hist(intercept1,breaks=40)

hist(slope1,breaks=40)

postscript("hist.ps")



#################

## Neural Net  ##

#################



# Training of a neural net over a parameter space.

# You will often want to train across all combinations of a set

# of (M) decay rates, a set of (N) numbers of hidden nodes, a set 

# of (P) train/test splits-- all with some number (R) of repetitions from 

# random starting values. This means you are performing M*N*P*R independent

# trainings of a neural network, all of which can theorectially excute 

# in parallel. 



# In practice, you would limit the parallelism to only splitting across

# certain parameter sets. In the above example, we may want to run M*N 

# parallel tasks over the decay rates and the numbers of hidden nodes, 

# and have each parallel run try all train/test splits with the given 

# number of random starts. 



# Read in libaries and Boston Housing Data

library(papply)

library(MASS)

data(Boston)

                                                                                

# Generate list of parameter sets

decays <- c(0.2,0.1,0.01)

n_hidden <- c(2,4,6)

parameters <- expand.grid(decays,n_hidden)

                                                                                

# Set random seeds in order to have reproduceable runs.

# 100 is seed for main process which generates folds.  Each task will

# have a random seed of the task number

main_seed <- 100

seeds <- c(1:nrow(parameters))

                                                                                

# Create list of argument sets.  Each argument is actually a list of

# decay rate, number of hidden nodes, and random seed

arguments <- list()

for (i in 1:nrow(parameters)) {

    arguments[[i]] <- list(decay=parameters[i,1],

                           hidden=parameters[i,2],

                           seed=seeds[i])

    }

                                                                                

# Need to set random seed before generating folds

# Generate random fold labels

set.seed(main_seed)

folds <- sample(rep(1:10,length=nrow(Boston)))

                                                                                

# Make a list of all shared data that should exist in all slaves in the

# cluster

shared_data <- list(folds=folds,Boston=Boston)

                                                                                

                                                                                

# Create function to run on the slave nodes.

# arg is a list with decay, hidden, and seed elements

try_networks <- function(arg) {

    # Make sure nnet library is loaded.

    # NOTE:  notice that nnet is not loaded above for the master - it doesn't

    #        need it.  It is loaded here because the slaves need it to be

    #        loaded.  The is.loaded function is used to test if the nnet

    #        function has been loaded.  If not, it loads the nnet library.

    if (!is.loaded("nnet")) {

        library("nnet")

        }

                                                                                

    # Set the random seed to the provided value

    set.seed(arg$seed)

                                                                                

    # Set up a matrix to store the rss values from produced nets

    rss_values <- array(0,dim=c(10,5))

                                                                                

    # For each train/test combination

    for (i in 1:10) {

        # Try 5 times

        for (j in 1:5) {

            # Build a net to predict home value based on other 13 values.

            trained_net <- nnet(Boston[folds!=i,1:13], Boston[folds!=i,14],

                                size=arg$hidden,  decay=arg$hidden,

                                linout=TRUE)

            # Try building predictions based on the net generated.

            test <- predict(trained_net, Boston[folds==i,1:13],type="raw")

            # Compute and store the rss values.

            rss <- sqrt(sum((Boston[folds==i,14] - test)^2))

            rss_values[i,j] <- rss

            }

        }

                                                                                

    # Return the rss value of the neural net which had the lowest

    # rss value against predictions on the test set.

    return(min(rss_values))

    }

                                                                                

# Call the above function for all sets of parameters

results <- papply(arguments, try_networks, shared_data)

                                                                                

# Output a list of parameter vs. minimum rss values

df <- data.frame(decay=parameters[,1],hidden=parameters[,2],

                 rss=sapply(results,function(x) {return(x)})

                 )

df





########################

##### EM Algorithm #####

########################



# EM Example: two-component mixture one-dimensional

# normal model



# Load the data

data(iris)



# Extract the part of the data containing 

# two species "virginica" and "versicolor" and 

# an imput variable "sepal length".

x<-iris[which(iris[,5]!="virginica"),c(3,5)]



# The expectation of EM algorithm

# This function is used to calculate the class labels, z's

# by plugging the estimates of parameters from Maximization step

# The E step of EM without parallelization

EM.Estep<-function(data,pi.prop,mu,sigma,fun){



library(papply) 

# 'data':: incomplete data set without class labels

  len.row<-dim(data)[[1]]



  part1<-pi.prop*fun(data,mu[1],sigma[1])  

  part2 <- (1-pi.prop)*fun(data,mu[2],sigma[2])      

  index <- part1/(part1+part2)  

  

   return(index)

}



# The E step of EM with parallelization

EM.Estep<-function(data,pi.prop,mu,sigma,fun){



library(papply) 

# 'data':: incomplete data set without class labels

  len.row<-length(data)



# Creat a list of arguments    

  arg <- list()

  for(i in 1:len.row){

    arg[[i]] <- i

  }  



 fn <- function(arg){

   

      row.index  <- arg

      part1<-pi.prop*fun(data[row.index],mu[1],sigma[1])  

      part2 <- (1-pi.prop)*fun(data[row.index],mu[2],sigma[2])      

      index <- part1/(part1+part2)  

  

      return(index)

   } 

  index <- papply(arg,fn)

  

  return(index) 

 }

  

# The M step of EM with parallelization

EM.Mstep <- function(data,index){

   len.row <- length(data)

 

   index.sum1 <- sum(index)  

   index.sum2 <- len.row-index.sum1



# Creat a list of arguments

  arg <- list()

  for(i in 1:len.row){

     arg[[i]] <- i

  }



  fn <- function(arg){

    row.index <- arg

    part1 <- index[row.index]*data[row.index]

    part2 <- (1-index[row.index])*data[row.index]

    part3 <- index[row.index]*data[row.index]^2

    part4 <- (1-index[row.index])*data[row.index]*2

  

    return(list(part1,part2,part3,part4))

 }



 result <- papply(arg,fn)



 index.x <- as.numeric(lapply(result,function(arg){return(arg[[1]])}))

 index.1x <- as.numeric(lapply(result,function(arg){return(arg[[2]])}))

 index.x2 <- as.numeric(lapply(result,function(arg){return(arg[[3]])}))

 index.1x2 <- as.numeric(lapply(result,function(arg){return(arg[[4]])}))

 

 pi.prop <- index.sum1/len.row

 mu1 <- sum(index.x)/index.sum1

 mu2 <- sum(index.1x)/index.sum2

 sigma1 <- sqrt((sum(index.x2)-2*mu1*sum(index.x)+mu1^2*index.sum1)/index.sum1)

 sigma2 <- sqrt((sum(index.1x2)-2*mu2*sum(index.1x)+mu2^2*index.sum2)/index.sum2)



 return(list(pi.prop,c(mu1,mu2),c(sigma1,sigma2)))

}



 

# Main function of EM algorithm



main<-function(data,pi.prop,mu,sigma){

   

    log.likelihood <- sum(log(pi.prop*1/sigma[1]*1/sqrt(2*pi)*exp(-(data-mu[1])^2/2/sigma[1]^2)+(1-pi.prop)*1/sigma[2]*1/sqrt(2*pi)*exp(-(data-mu[2])^2/2/sigma[2]^2)))    

    print(log.likelihood)

    log.new <- 0

    i <- 0



    while(abs(log.new-log.likelihood)>0.01){

       log.new <- log.likelihood

       

       Estep.result <- as.numeric(lapply(EM.Estep(data,pi.prop,mu,sigma,dnorm),function(arg){return(arg[[1]])}))

       

       Mstep.result <- EM.Mstep(data,Estep.result)

       

       pi.prop <- Mstep.result[[1]]

       cat("pi:",as.character(pi.prop),"\n") 

       

       mu <- Mstep.result[[2]]

       cat("mu:",as.character(mu),"\n") 

       

       sigma <- Mstep.result[[3]]

       cat("sigma:",as.character(sigma),"\n")   



       log.likelihood <- sum(log(pi.prop*1/sigma[1]*1/sqrt(2*pi)*exp(-(data-mu[1])^2/2/sigma[1]^2)+(1-pi.prop)*1/sigma[2]*1/sqrt(2*pi)*exp(-(data-mu[2])^2/2/sigma[2]^2)))    

   }    



   return(list(pi.prop,mu,sigma))

 }    



# set up a random seed

    set.seed(100)

    

# set up initial values

    pi.prop <- 0.6

    mu<-runif(2,min(data),max(data))

    sigma<-runif(2,0,6)



main(x[,1],pi.prop,mu,sigma)