那些在R中进行并行计算的坑

blog
Author

R package build

Published

June 18, 2022

最近在疯狂做模拟数据分析,需要大量在R中进行并行运算。在这过程中被R无情的虐的体无完肤,所以整理一下踩过的坑,希望能帮助到有相同困难的盆友。

doParallelforeach

这两个包一般合并在一起使用,可以实现类似与for循环这样的结构。总的来说,foreach包还是非常有效率和简洁的。并行计算的模板是foreach(args) %dopar% {expression}

然而,我发现一些问题有:

  • 很难计算单个运算单元的运行时间。总共所有条件\times重复用的时间可以用system.time()来记录,但是对于单个循环的时间很难记录。这样的话在一个很多条件的模拟研究中很难事前估记大概要花多久。只能先进行一轮计算,然后大概估计。
  • 对于内存的占用率很高。一般来说这是fork并行运算的通病,当然如果是socket方式的话io之间的信息传递速度就是瓶颈了。当内存过高后,foreach会提示no enough vmem。一个解决方法是使用sequential和parallel共同使用的方法,这样的话内存使用率就会限制在conditions之间而不是iterations \times conditions。比如:
⌘+C
for(i in 1:iterations){
  foreach(n in 1:conditions) %dopar% {
    expression...
  }
}

multidplyr

这个包是Hadley用来填补tidyverse没有并行处理从而开发的。这个包的原理是将数据用partition()来分布到不同的clusters,然后用group_bysummarisedplyr传统方法来进行运算。然而,在尝试之后,我觉得非常不好用:

  • 使用范围非常局限。必须将所有的conditions的数据用nested方法打包成一个tibble才好使用。这样的一个超大文件会大量占用内存空间,如果你想把这样的文件保存成一个RDS文件的话,会超大无比。比如一个10000 conditions的实验,算上原始数据,设定参数和运算结果会占用1~10GB。你不会想把这些都一股脑的放进一个R session的。
  • 必须对所有的数据进行统一的运算。用summarise方法进行呈现的话,只能提供一个或多个方法的结果。而不能实现诸如 调取数据-运算-得出结果 这样三段式方法。如果不同条件要进行不同的处理的话就会有问题。
  • 并没有觉得很快。可能是这种long tibble拆分的方式局限在数据层,而没有在方法层进行并行处理。
  • 前面的foreach包有的缺点都有。

Grid Engine并行环境配置

除了软件方法,还可以用物理方法。通过合理使用高性能计算机集群(High Performance Cluster),可以实现更加强大的并行处理。我们爱荷华大学的HPC使用Argon进行任务管理。在实现上可以使用多job提交的方式来实现并行处理,比如不同的condition提供不同的jobID。学生好像可以一次性并行提交5个job tasks。

“使用HPC的一个重要的建议是将工作任务切分成小块并且请求小数量的运算核心来处理每一个小块。这样会使得任务更加高效,比如当scheduler发现有闲置的核心时就能用来运行你提交的小块任务。”(详见此篇文章)

以下是一个模板。makeArgoJobFile方法可以批量制造hpc的job文件。使用system()来让R运行命令行。使用qsub -t 1-[nreps] ****.job的bash命令来进行同一个条件重复模拟(nreps替换成重复数量,比如10000)并提交给HPC集群。

⌘+C
# loop through conditions and submit job file with replications as array
index = 0
items = n_items_per_factor[1]
for (items in n_items_per_factor){

  obs = n_obs[1]
  for (obs in n_obs){

    factors = n_Factors[1]
    for (factors in n_Factors){

      index = index+1
      job = makeArgonJobFile(n_items_per_factor = items,
                             n_obs = obs,
                             n_Factors = factors,
                             argon_Rscript_path = "~/PPMCsim/CFA/Script/argon_irt_onerep.R",
                             argon_job_path = paste0("~/PPMCsim/CFA/Results", index, ".sh"),
                             seed = initial.seed+index*100000,
                             n_reps = 1)

      write.table(job, file = paste0("~/PPMCsim/CFA/Results/argonjob_", obs, "_", factors,
                                     "_", items, ".job"), quote=FALSE, row.names=FALSE, col.names=FALSE)

      system(paste0("qsub -t 1-", n_reps, " ~/PPMCsim/CFA/Results/argonjob_", obs, "_", factors, "_", items, ".job"))

    }
  }
}

用这个方法的问题是这个方法局限于对同一个条件(condition)进行重复(repetition)。而有些项目不是重复同一个条件,而是无数条件中抽样而不重复,这样的话就无法享受qsub -t这个命令了。

结语

这篇小日志只是简单了列举了一些我最近在使用的工具。但是还有很多方法可以进行并行处理,比如用snow包和parallel包。或者使用更底层的mpi进行不同运算单元的数据交换。如果我以后学习到更多并行处理的知识,我也会继续更新的。Keep learning!

参考资料

  1. R-blogger:关于在SGE中使用snow包。https://www.r-bloggers.com/2011/06/serialize-or-turn-a-large-parallel-r-job-into-smaller-chunks-for-use-with-sge/
Back to top