是指一个程序的多个任务在同一时段内可以同时执行。但从微观上看,这些任务任意时刻可能并不是始终都在运行,每个工作任务都呈现出走走停停这种相互交替的状态。如三个任务在一个单核处理器上交替执行。
并行计算是指一个程序通过多个任务紧密协作来解决某个问题。
分布式计算是指一个程序需要与其他程序协作来解决某个问题。 并行程序和分布式程序都是并发的。但是并发程序并不一定都是并行的或是分布式的,因为并行或分布式程序要求系统在执行多个工作任务时拥有多核CPU体系。 并行和分布式方法计算相比于并发计算,可以从本质上提高计算速度。
并行和分布式的区别在于各个核之间的连接方式。并行计算程序往往同时在物理上紧密靠近,或是共享内存或是通过高速互联网相互连接的多个核的系统上执行多个任务,这样的连接方式较为紧密。而分布式程序往往是在多个计算机上执行,这些计算机之间相隔较远并且任务是由独立建立的程序来完成的,这样的连接是松耦合的。两者的区别还体现在任务的独立性上。在并行计算中,任务之间的独立性比较弱,小任务的计算结果会决定最终计算结果:在分布式计算中,任务间独立性强,小任务的计算结果一般不会影响最终的结果。举例来说,我们前面说到在一个电路连接的多个核的计算上同时运行多个任务是并行的:而网络搜索程序是分布式的。
将多个完整的单核处理器放到一个集成电路芯片上,这样的集成电路称为多核处理器。核成为了中央处理器(CPU)的代名词。在这样的设定下,传统的只有一个CPU的处理器称为单核系统。利用网络连接多个处理器的技术在现今得到了广泛的运用,这种技术是利用网络连接将多个处理器以一定的方式有序的组织起来。这种连接方式可以使得系统能够以更快的速度完成一项大规模的计算任务,其最主要的组成部分是计算机计算节点和节点间的通信和协作机制。节点是指一个独立的”计算机单元”,通常由多个CPU处理器/处理内核、内存、网络接口等组成。节点联网在一起构成一个并行计算机系统或超级计算机。并行计算机体系结构的发展主要体现在计算节点性能的提高以及节点间通信技术的改进两方面。
虽然构建了并行系统,但是大多数程序都是串行程序,都是以在单核系统上运行为目的所编写的。为了使程序能够在并行系统上更快地运行,就需要将串行程序改写成并行程序,或者编写一个翻译程序来自动地将串行程序翻译成并行程序,只有这样才能充分的利用并行系统的性能。为了编写并行程序,通常将一个问题分成几部分,再分别分配给各个核。可以并行
解决的问题需要具备两个特征:具有内在并行性:任务或数据可以分割。通过编写并行程序解决的问题往往都比较复杂,这些问题的并行性可能表现为数据并行、任务并行、流水线并行的混合,也就是混合并行。
两者的区别在于缓存(cache)、控制器(control)和逻辑处理单元(alu)的组合上。CPU的缓存很大,可以用来存储大量数据,并且每个逻辑处理单元可以共享缓存数据,速度很快;控制器性能强大,逻辑运算单元性能也强。可以看出CPU很适合处理需要线性计算的任务,需要用到频繁写入读取缓存数据的复杂计算,比如物理模拟。而GPU逻辑处理单元性能比CPU低,但数量多,而由于并行架构原因,每组逻辑处理单元的控制器和缓存都是独立的,带来的优势是在大量不需要复杂逻辑运算时,运算效率非常高。
R的并行可以分为:
隐式并行:隐式计算对用户隐藏了大部分细节,用户不需要知道具体数据分配方式,算法的实现或者底层的硬件资源分配。系统会根据当前的硬件资源来自动启动计算核心。(OpenBLAS,Intel MKL,NVIDIA cuBLAS,H2O等)
显性并行:显式计算则要求用户能够自己处理算例中数据划分,任务分配,计算以及最后的结果收集。因此,显式计算模式对用户的要求更高,用户不仅需要理解自己的算法,还需要对并行计算和硬件有一定的理解。现有R中的并行计算框架,如parallel (snow,multicores),Rmpi和foreach等采用的是映射式并行模型(Mapping),使用方法简单清晰,极大地简化了编程复杂度。R用户只需要将现有程序转化为*apply或者for的循环形式之后,通过简单的API替换来实现并行计算。(parallel(主打lapply应用)、foreach(主打for循环)、SupR、还有利用GPU的办法(gpuR))
parallel包的思路和lapply函数很相似,都是将输入数据分割、计算、整合结果。只不过并行计算是用到了不同的cpu内核来运算。在parallel包里,对应上述两种并行化方式有如下两个核心函数(针对于lapply函数的并行化,mclapply在windows上不能使用)。
apply函数形式为apply(X, MARGIN, FUN, …),其中X为matrix或者array,MARGIN = 1 表示第一维(行)运算,= 2 表示第一维(列)运算。=n表示第n维。
例1:
x<-matrix(rnorm(200),20,10)
apply(x,2,mean)
## [1] -0.118340152 -0.369487350 -0.082326996 -0.038320165 0.045927512
## [6] 0.271737921 -0.039529278 0.214129455 0.043615442 0.008677718
apply(x,1,sum)
## [1] 1.0255018 -2.4108477 2.6551149 -1.7775414 -2.3026210 -1.1938288
## [7] 4.3960818 -9.4695592 0.2706063 0.7505024 2.6987615 0.6737333
## [13] -1.6981927 3.3087287 -2.9760697 1.2907275 -5.1343432 4.4624364
## [19] 2.7266428 1.4258485
lapply即优美又简单:它只需要一个参数(一个vector或list),和一个以该参数为输入的函数,最后返回一个列表。你还可以添加额外的参数:
例1:
lapply(1:5,function (x)x^2)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 4
##
## [[3]]
## [1] 9
##
## [[4]]
## [1] 16
##
## [[5]]
## [1] 25
例2:
lapply(1:3,function(x)c(x,x^2,x^3))
## [[1]]
## [1] 1 1 1
##
## [[2]]
## [1] 2 4 8
##
## [[3]]
## [1] 3 9 27
例3:
lapply(1:3/3,round,digits=3)
## [[1]]
## [1] 0.333
##
## [[2]]
## [1] 0.667
##
## [[3]]
## [1] 1
例4:
movies <-c("SPYDERMAN","BATMAN","VERTIGO","CHINATOWN")
movies_lower <-lapply(movies,tolower)
str(movies_lower)
## List of 4
## $ : chr "spyderman"
## $ : chr "batman"
## $ : chr "vertigo"
## $ : chr "chinatown"
例5:
x <- list(a = 1, b = 1:3, c = 10:100)
lapply(x, FUN = length)
## $a
## [1] 1
##
## $b
## [1] 3
##
## $c
## [1] 91
sapply()是lapply()的变种。它的任务是将lapply()的结果尽量的简化。lapply()总是返回一个列表。就算是所有元素长度都一样,没必要是一个列表的时候,他也给你返回列表。sapply()就会更加灵活一些,可以直接返回一个包含所有元素的向量,将它简化。
例1:
x <- list(a = 1, b = 1:3, c = 10:100)
sapply(x, FUN = length)
## a b c
## 1 3 91
sapply(x, FUN = sum)
## a b c
## 1 6 5005
例2:
sapply(1:5,function(x)x^2)
## [1] 1 4 9 16 25
例3:
sapply(1:5,function(x)c(x^2,x^3))#This outputs a matrix
## [,1] [,2] [,3] [,4] [,5]
## [1,] 1 4 9 16 25
## [2,] 1 8 27 64 125
两个函数的对比:
x <- list(a=1:4,b=rnorm(18),c=rnorm(28,1),d=rnorm(180,5))
lapply(x,mean)
## $a
## [1] 2.5
##
## $b
## [1] 0.4741625
##
## $c
## [1] 0.7322197
##
## $d
## [1] 5.08242
x <- list(a=1:4,b=rnorm(10),c=rnorm(20,1),d=rnorm(100,5))
sapply(x,mean)
## a b c d
## 2.50000000 0.04980386 0.78574179 4.93965247
sapply() also provides two additional parameters: simplify and USE.NAMES. If both of them are kept false, the output generated is the same as lappy()
sapply(1:5,function(x)x^2,simplify=FALSE,USE.NAMES=FALSE)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 4
##
## [[3]]
## [1] 9
##
## [[4]]
## [1] 16
##
## [[5]]
## [1] 25
使用 parallel包,首先要初始化一个集群,这个集群的数量最好是你CPU核数-1。如果一台8核的电脑建立了数量为8的集群,那你的CPU就干不了其他事情了。所以可以这样启动一个集群:
library(parallel)
no_cores <- detectCores() -1
c1 <- makeCluster(no_cores)
####parallel包的安装
install.packages("parallel")
####clusterEvalQ() 例1:
clusterEvalQ(cl, 2 + 2)
例2: (会报错,因为 the lack of inheritance)
x <- 1
clusterEvalQ(cl, x)
例3: 因为test在主进程中并不存在,所以会报错
clusterEvalQ(cl, test <- 1)
clusterEvalQ(cl, test)
test
例4:
clusterExport(cl, "x")
clusterEvalQ(cl, x)
例5: 我们也可以用clusterEvalQ来加载包:
clusterEvalQ(cl, {
library(ggplot2)
library(stringr)
})
clusterEvalQ(cl, library(lme4))
并行与否的区别:
情况1:不使用parallel进行并行计算
fun <- function(x){
return (x+1);
}
system.time({
res <- lapply(1:5000000, fun);
});
## 用户 系统 流逝
## 3.76 0.06 3.83
情况2:使用parallel进行并行计算
#加载parallel包
library(parallel)
clnum<-detectCores()
cl <- makeCluster(clnum);
system.time({
res <- parLapply(cl, 1:5000000, fun)
});
## 用户 系统 流逝
## 1.80 0.07 2.88
stopCluster(cl);
我们可以看到运行速度得到了提升。
例1:
#Setting a base variable
library(parallel)
base <- 4
cores_number=detectCores()
clust<- makeCluster(cores_number)
#Note that this line is required so that all cores in cluster have this variable available
clusterExport(clust, "base")
#Using the parSapply() function
parSapply(clust, 1:5, function(exponent) base^exponent)
## [1] 4 16 64 256 1024
stopCluster(clust)
mclapply
ibrary(parallel)
library(MASS)
starts <- rep(100, 40)
fx <- function(nstart) kmeans(Boston, 4, nstart=nstart)
numCores <- detectCores()
numCores
使用lapply耗时:
system.time(
results <- lapply(starts, fx)
)
使用mclapply耗时:
system.time(
results <- mclapply(starts, fx, mc.cores = numCores)
)
iris数据集是常用的分类实验数据集,由Fisher1936收集整理。iris也称鸢尾花卉数据集,是一类多重变量分析的数据集。数据集包含150个数据样本,分为3类,每类50个数据,每个数据包含4个属性。可通过花萼长度,花萼宽度,花瓣长度,花瓣宽度4个属性预测鸢尾花卉属于(Setosa,Versicolour,Virginica)三个种类中的哪一类。
不使用并行方法:
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
res <- data.frame()
system.time({
trial <- 1
while(trial <= trials) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(res, r)
trial <- trial + 1
}
})
## 用户 系统 流逝
## 14.70 0.80 15.52
这个循环的问题是我们按顺序执行每个试验,这意味着这台机器上的 8 个处理器中只有一个在使用。为了利用并行性,我们需要能够将我们的任务作为函数分派,每个处理器都有一个任务。为此,我们需要将任务转换为函数,然后使用*apply()R 函数族将该函数应用于集合的所有成员。在 R 中,使用apply通常比循环中的等效代码快得多。下面是重写为使用lapply()的代码。:
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
}
system.time({
results <- lapply(trials, boot_fx)
})
## 用户 系统 流逝
## 15.36 0.08 15.44
使用并行方法:
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
}
system.time({
results <- mclapply(trials, boot_fx, mc.cores = numCores)
})
foreach包是一个支持在R语言中调用多进程功能的第三方包。foreach包执行任务的核心理念与传统的apply组函数基本一致,都是与split – apply – combine一致的流程,不过foreach比传统apply组函数的优越之处在于,它可以通过调用操作系统的多核运行性能来执行并行任务,大大减少代码执行时间,提高代码执行效率。
foreach(..., #待输入的参数
.combine, #结果返回后执行的数据合并操作(c代表合并为向量,list代表合并为列表,rbind代表合并为数据框)
.packages=NULL, #在多进程共享的程序包(仅对于非系统安装包必备)
.export=NULL, #未在当前环境中定义的数据对象
.verbose=FALSE #是否打印运行信息
)
以上函数中,第1个参数是必备参数,即必须有输入参数,结果默认返回list。
library(foreach)
library(doParallel)
## 载入需要的程辑包:iterators
将循环值赋予i,进而将i作为函数sqrt的参数
foreach(i=1:3) %dopar% sqrt(i)
## Warning: executing %dopar% sequentially: no parallel backend registered
## [[1]]
## [1] 1
##
## [[2]]
## [1] 1.414214
##
## [[3]]
## [1] 1.732051
类似,我们可以指定多个循环:
foreach(a=1:3, b=rep(10, 3)) %dopar% {
a + b
}
## [[1]]
## [1] 11
##
## [[2]]
## [1] 12
##
## [[3]]
## [1] 13
上面a和b被赋予同样数量的值,如果只赋予两个值给b,而给a赋1000个值,我们得到的值同样只有2个:
foreach(a=1:1000, b=rep(10, 2)) %dopar% (a + b)
## [[1]]
## [1] 11
##
## [[2]]
## [1] 12
foreach返回的数据格式默认是list格式,有时我们需要向量格式的数据,此时可以使用.combine选项,比如用c来连接,返回的结果为向量。
foreach(i=1:3, .combine="c") %dopar% exp(i)
## [1] 2.718282 7.389056 20.085537
也可以使用”cbind”将生成的多个向量组合成矩阵,例如生成四组随机数向量,进而按列合并成矩阵:
foreach(i=1:4, .combine="cbind") %dopar% rnorm(4)
## result.1 result.2 result.3 result.4
## [1,] 1.0105756 0.07132480 0.5653578 0.9185171
## [2,] -0.4524009 0.56984769 0.5221775 1.4430227
## [3,] 1.7809888 0.08953299 1.0101483 -1.6969040
## [4,] 0.9586788 -0.44999585 -2.2270333 0.9914685
也可以使用”+“或”*“等运算符作为.combine选项的值:
foreach(i=1:4, .combine="+") %dopar% rnorm(4)
## [1] 1.05279774 -1.43855200 -0.02775815 -1.94822435
例:按列求均值
m <- matrix(rnorm(9), 3, 3)
m
## [,1] [,2] [,3]
## [1,] 0.2323120 1.6167417 0.47209730
## [2,] -0.6334353 -0.3559077 0.03508369
## [3,] 0.3199985 1.5065859 -1.00339808
colMeans(m)
## [1] -0.0270416 0.9224733 -0.1654057
m <- matrix(rnorm(9), 3, 3)
m
## [,1] [,2] [,3]
## [1,] 0.6539362 0.7016361 0.2686971
## [2,] 0.5623224 0.6770581 -0.2168249
## [3,] -0.7704142 1.5875650 -0.8658533
foreach(i=1:ncol(m), .combine=c) %dopar% mean(m[,i])
## [1] 0.1486148 0.9887531 -0.2713270
创建一个函数,输入i值,返回的是there are i apples
i=2
mission<-function(i){
content<-glue::glue("There are {i} apples")
return(content)
}
mission(i)
## There are 2 apples
用for循环实现将前5000个结果显示出来
t0<-Sys.time()
apple<-c()
for(i in 1:5000){
number<-mission(i)
apple<-c(apple,number)
}
Sys.time()-t0 #显示代码运行的时间
## Time difference of 0.6641071 secs
进一步简化代码,用lapply函数实现上述结果
lapply(1:10,FUN=mission) #注意,lapply返回的是列表
## [[1]]
## There are 1 apples
##
## [[2]]
## There are 2 apples
##
## [[3]]
## There are 3 apples
##
## [[4]]
## There are 4 apples
##
## [[5]]
## There are 5 apples
##
## [[6]]
## There are 6 apples
##
## [[7]]
## There are 7 apples
##
## [[8]]
## There are 8 apples
##
## [[9]]
## There are 9 apples
##
## [[10]]
## There are 10 apples
unlist(lapply(1:10,FUN=mission)) #变成向量
## [1] "There are 1 apples" "There are 2 apples" "There are 3 apples"
## [4] "There are 4 apples" "There are 5 apples" "There are 6 apples"
## [7] "There are 7 apples" "There are 8 apples" "There are 9 apples"
## [10] "There are 10 apples"
仍旧用lapply函数,但输入变量变为2个
t1<-Sys.time()
mission_2<-function(i,j){
content<-glue::glue("There are {i} apples, there are {j} oranges")
}
lapply(1:5,FUN=mission_2,j=1:2)
## [[1]]
## There are 1 apples, there are 1 oranges
## There are 1 apples, there are 2 oranges
##
## [[2]]
## There are 2 apples, there are 1 oranges
## There are 2 apples, there are 2 oranges
##
## [[3]]
## There are 3 apples, there are 1 oranges
## There are 3 apples, there are 2 oranges
##
## [[4]]
## There are 4 apples, there are 1 oranges
## There are 4 apples, there are 2 oranges
##
## [[5]]
## There are 5 apples, there are 1 oranges
## There are 5 apples, there are 2 oranges
Sys.time()-t1
## Time difference of 0.006372929 secs
现在用foreach实现上述结果
一个输入变量:
cores<-detectCores()#查看有多少个核
cl<-makeCluster(cores)
registerDoParallel(cl)
t1<-Sys.time()
test<-foreach(x=1:5000,.combine = "c") %dopar% mission(x)
Sys.time()-t1
## Time difference of 1.000463 secs
stopCluster(cl)
两个输入变量:
cores<-detectCores()
cl<-makeCluster(cores)
registerDoParallel(cl)
t1<-Sys.time()
x<-1:5
y<-1:2
test<-foreach(x=x,.combine = "c") %dopar% mission_2(x,y)
Sys.time()-t1
## Time difference of 0.06052589 secs
test
## [1] "There are 1 apples, there are 1 oranges"
## [2] "There are 1 apples, there are 2 oranges"
## [3] "There are 2 apples, there are 1 oranges"
## [4] "There are 2 apples, there are 2 oranges"
## [5] "There are 3 apples, there are 1 oranges"
## [6] "There are 3 apples, there are 2 oranges"
## [7] "There are 4 apples, there are 1 oranges"
## [8] "There are 4 apples, there are 2 oranges"
## [9] "There are 5 apples, there are 1 oranges"
## [10] "There are 5 apples, there are 2 oranges"
stopCluster(cl)
SupR是一款同时支持多线程和分布式计算的修改版R;
由普渡大学统计系刘传海教授开发;
力图实现基于R的大数据分析平台;
SupR在源代码级别对R进行了修改;
在可能引起线程冲突的部分加入互斥锁,保证程序正常运行;
提供完整的线程创建、查询、打断、同步和取消机制;
主要特性:
保持R的语法和内部数据结构不变
提供多线程并行计算支持
提供类似Spark的分布式集群运算
分布式文件系统支持
new.thread():创建线程
start.thread():开启线程
sync.eval():同步线程执行
wait():令线程睡眠,等待信号
notify():唤醒线程
set.seed(123)
n=10
A<-matrix(rnorm(n^2), n)
B<-matrix(rnorm(n^2), n)
th1<-new.thread({
C1<<-A%*%B[,1:(n/2)]
})
th2<-new.thread({
C2 <<-A%*%B[,(n/2+1):n]
})
start.thread(th1); start.thread(th2)
ls()
SupR目前仍处在内部试用和补充完善阶段。
gputools 基本上是最具通用性的 package,由 Michigan 大学开发。
step1:安装和配置CUDA环境,参考 CUDA zone 网站。gpu并行计算的实现方法有很多,但目前CUDA可能是支持最好,也是使用最多的。gputools,cudaBayesreg,HiPLARM,HiPLARb和gmatrix等package都依赖于CUDA后端。
step2:安装gputools
library(gputools)
举例:创建两个n*n的矩阵相乘,并查看系统运行时间
gpu.matmult <- function(n) {
A <- matrix(runif(n * n), n ,n)
B <- matrix(runif(n * n), n ,n)
tic <- Sys.time()
C <- A %*% B
toc <- Sys.time()
comp.time <- toc - tic
cat("CPU: ", comp.time, "\n")
tic <- Sys.time()
C <- gpuMatMult(A, B)
toc <- Sys.time()
comp.time <- toc - tic
cat("GPU: ", comp.time, "\n")
}
当n越大时,gpu并行的优势越明显。
rpud 是著名的 R Tutorial 网站开发的,下载界面提供了三个 package,RPUD、RPUDPLUS 和 RPUSVM。其中只有 rpud 是开源的。
举例:
test.data <- function(dim, num, seed = 17) {
set.seed(seed)
matrix(rnorm(dim * num), nrow = num)
}
m <- test.data(120, 4500)
system.time(dist(m))
library(rpud)
system.time(rpuDist(m))
安装package
install.packages("HiPLAR")
举例,沿用上述案例:
A <- matrix(runif(n * n), n ,n)
B <- matrix(runif(n * n), n ,n)
n=5000
system.time(C <- A%*%B)#查看运行时间
library(HiPLARM)
system.time(C <- A%*%B)#启用HiPLARM包后再查看运行时间,发现效率明显提升
虽然现有几个R包,因此仅限于使用NVIDIA GPU。GPUR的新颖之处在于使用了 ViennaCL 库 (http://viennacl.sourceforge.net/,该库已在 RViennaCL 软件包中重新打包,以用于其他 R 软件包。这允许用户在任何GPU上利用ViennaCL库的自动调整的OpenCL内核。它还为那些实际上拥有NVIDIA GPU以提高性能的人提供了CUDA后端,这将在配套的gpuRcuda包中提供。在上述包中,大多数都包含一组非常有限的函数,可供 R 用户在包中使用。最广泛的是 gmatrix 包,它包含大多数线性代数运算。所有软件包(gmatrix 除外)都不将数据存储在 GPU 上。因此,存在在设备和主机之间来回传输数据的开销。与 gmatrix 类似,GPUR利用 S4 类来存储指向 GPU 上数据的外部指针,这些数据镜像基矩阵和向量类。但是,考虑到R编程的交互性以及GPU上可用的有限RAM,此包提供了从GPU RAM中删除对象的中间类,以允许对象存储在CPU上,但仍根据需要使用GP22U。
install.packages("gpuR")
如果用户有兴趣使用当前的开发版本,可以直接从此github帐户安装。
devtools::install_github("cdeterman/gpuR", ref = "develop")
gpuR 具有最基本的线性代数运算。用户只需创建一个 gpuMatrix 对象,GPU 方法就会被使用。
示例:
library("gpuR")
detectGPUs()
set.seed(123)
gpuA <- gpuMatrix(rnorm(16), nrow=4, ncol=4)
gpuB <- gpuA %*% gpuA
有时,仅引用矩阵的子集并应用某些操作很有用。这是通过分别为矩阵类和向量类提供块和切片方法来实现的。生成的对象是父对象的子类(例如 gpuMatrix 中的 gpuMatrix Block)。因此,几乎所有为父级定义的方法还为子类定义了对象。
# create gpuMatric
set.seed(123)
gpuA <- gpuMatrix(rnorm(16), nrow=4, ncol=4) # create block omitting the ist row
gpuB <- block(gpuA,
rowStart = 2L, rowEnd = 4L, colStart=1L,colEnd=4L)
生成的对象是对父对象元素的引用,以避免内存重复。因此,对”块”或”切片”对象的任何更改都将更改父元素。使用深度复制函数则可避免此问题。