2
votes

R: Erreur dans parLapply - $ invalid pour les vecteurs atomiques ne se produit qu'en parallèle

J'ai essayé de rechercher une question en double et je sais que beaucoup de gens ont posé des questions sur parLapply dans R, donc je m'excuse si j'en ai oublié une qui s'applique à ma situation.

Problème: j'ai la fonction suivante qui fonctionne correctement dans R mais quand j'essaye de l'exécuter en parallèle en utilisant parLapply (je suis sur une machine Windows) j'obtiens l'erreur que $ operator n'est pas valide pour les vecteurs atomiques . L'erreur mentionne que 3 nœuds ont produit les erreurs, quel que soit le nombre de nœuds sur lesquels j'ai défini mon cluster, par exemple, j'ai 8 cœurs sur mon bureau, j'ai donc défini le cluster sur 7 nœuds. Voici un exemple de code montrant où se trouve le problème:

Error in checkForRemoteErrors(val) : 
  3 nodes produced errors; first error: $ operator is invalid for atomic vectors

Cela fonctionne exactement comme il se doit, cependant lorsque j'essaye de faire ce qui suit pour fonctionner en parallèle, j'obtiens une erreur

#make the cluster
  cor <- detectCores()
  cl<-makeCluster(cor-1,type="SOCK")

#passing library and arguments
  clusterExport(cl, c("inputs"))
  clusterEvalQ(cl, {
    library(arrangements)
  })

  results <- parLapply(cl, inputs, perms)


J'obtiens l'erreur:

library(parallel)
library(doParallel)
library(arrangements)

#Function

 perms <- function(inputs)
  {
    x <- 0
    L <- 2^length(inputs$w)
     ip <- inputs$ip
    for( i in 1:L)
    {
      y <- ip$getnext()%*%inputs$w
      if (inputs$t >= y)
      {
        x <- x + 1
      }
    }
    return(x)
  }

#Inputs is a list of several other variables that are created before this 
#function runs (w, t_obs and iperm), here is a reproducible example of them
#W is derived from my data, this is just an easy way to make a reproducible example


  set.seed(1)
  m <- 15
  W <- matrix(runif(15,0,1))
  iperm <- arrangements::ipermutations(0:1, m, replace = T)
  t_obs <- 5

  inputs <- list(W,t_obs, iperm)
  names(inputs) <- c("w", "t", "ip")

#If I run the function not in parallel
perms(inputs)

#It gives a value of 27322 for this example data

Cependant, j'ai vérifié si quelque chose est un vecteur atomique en utilisant is. atomic () , et en utilisant is.recursive (inputs) il dit que c'est TRUE .

Ma question est de savoir pourquoi j'obtiens cette erreur lorsque j'essaie de l'exécuter en utilisant parLapply alors que la fonction s'exécute correctement par ailleurs et y a-t-il une raison pour laquelle "3 nœuds ont produit des erreurs" même si j'ai 7 nœuds?


2 commentaires

Peut-être une faute de frappe, mais vous ne définissez jamais m utilisé dans les ipermutations .


@ r2evans oui une faute de frappe, m est défini comme 15 ailleurs dans le code, j'ai ajouté cela. Je ne pense pas que j'aurais besoin de passer cela dans clusterExport car seul iperm en dépend et je passe iperm car il fait partie des entrées.


3 Réponses :


2
votes

Il dit "3 nœuds" car, lorsque vous le transmettez à parLapply , vous n'activez que trois nœuds. Le premier argument de parLapply doit être une liste de choses, chaque élément à passer à chaque nœud. Dans votre cas, vos entrées est une liste, correcte, mais elle est décomposée, de sorte que vos trois nœuds voient effectivement:

parLapply(cl, replicate(7, inputs, simplify=FALSE), perms)

Vous pourriez répliquer ceci sur l'hôte local (pas parallèle ) avec:

parLapply(cl, list(inputs), perms)

et quand vous le voyez comme ça, peut-être que cela devient un peu plus évident transmis à vos nœuds. (Si vous voulez voir si plus loin, faites debug (perms) puis exécutez le lapply ci-dessus, et voyez ce que les entrées à l'intérieur de cette fonction appellent ressemble à.)

Pour que cela fonctionne une fois sur un nœud (je ne pense pas que ce que vous essayez de faire), vous pouvez faire

lapply(inputs, perms)

Mais cela n'exécutera qu'une seule instance sur un nœud. Vous préféreriez peut-être faire quelque chose comme:

# node 1
perms(inputs[[1]]) # effectively inputs$w
# node 2
perms(inputs[[2]]) # effectively inputs$t
# node 3
perms(inputs[[3]]) # effectively inputs$ip
# nodes 4-7 idle


9 commentaires

C'est utile pour comprendre ce qui se passe. Je ne pensais pas que la liste serait brisée comme ça. Ceci s'exécute cependant il renvoie 27322 pour chaque nœud; J'espérais diviser le travail entre les nœuds au lieu de les faire tous faire la même chose car à mesure que m devient grand iperm (le nombre de permutations devient énorme). Pour ce faire, je suppose que je dois diviser la partie iperm des entrées en morceaux qui sont approximativement (nombre de permutations / nombre de nœuds)? Je pense que je devrais pouvoir le faire avec iperm $ getnext (nombre de perms / nombre de nœuds).


Est-ce que arrangements :: ipermutation est stochastique? Je ne suis pas familier, désolé ... peu importe, si vous voulez des résultats différents, vous devrez probablement calculer le caractère aléatoire soit (a) avant le parallèle, n fois; ou (b) dans les nœuds. Je recommande le premier, car le second introduit des défis de reproductibilité (gestion aléatoire des semences).


La partie que je "sais" est aléatoire est W , et vous ne la générez qu'une seule fois. Peut-être quelque chose comme input_list <- replicate (7, {W <- matrix (runif (...)); list (w = W, t = t_obs, ip = iperm);}, simplify = FALSE) , puis parLapply (cl, input_list, perms) . Plus j'y pense, je me demande comment iperm fonctionne sur les nœuds, car en tant qu'objet j'avais supposé que sa persistance était effectuée avec un environnement , ce qui est notamment non compatible avec les nœuds d'un cluster ...


(Je devine pourquoi le < code> iperm instance d'un objet R6 - qui est un environnement - fonctionne encore est-ce qu'il régénère peut-être son environnement sur chaque nœud ?? Cela me semble magique, pas intuitif. Il pourrait être nécessaire de générer iperm sur chaque nœud, devrait être assez trivial.)


Je ne pense pas qu'arrangements :: ipermutations soit stochastique, c'est une liste de toutes les combinaisons. W n'est en fait pas aléatoire, c'était juste un moyen facile de faire un exemple reproductible, les valeurs réelles de W sont définies à partir de mes données (mauvaise modification pour plus de clarté. C'est la persistance est un peu au-delà de ma compréhension R. Je suis actuellement juste en essayant de diviser iperm en différents morceaux et de les transmettre aux nœuds. Par exemple, avec m <- 5, il y a 2 ^ 15 = 32768 combinaisons, je peux les diviser en morceaux en utilisant chunk <- floor ((2 ^ m) / (cor-1)) puis créez simplement une liste avec tout ce dont j'ai besoin pour les différents nœuds.


Ok, donc votre appel à arrangements :: ipermutation pourrait éventuellement être responsable de 32k résultats ... je l'ai compris. Si vous pouvez créer un générateur iperm distinct pour chaque nœud, alors vous voudrez former votre inputs_list <- Map (list, t = list (t_obs), w = list (W) , ip = list_len7_of_iperms) , qui vous donnera une liste où tous les t s et w sont constants entre eux, mais le ip s sera différent.


J'ai quelques moyens pour obtenir exactement ce que je recherche mais vous avez certainement répondu à la question initiale. Très utile. Quand je serai capable de comprendre exactement comment transmettre différentes permutations à différents nœuds, je posterai cela comme une réponse juste pour tous ceux qui pourraient faire quelque chose de similaire.


Je suggérerais de faire un commentaire audacieux sur debug (perms) . Cela m'a sauvé la journée. Bien que je connaissais la commande, vous m'avez fait prendre conscience de l'utiliser dans ce contexte - et cela fonctionne même en mode parallèle !!!


Intéressant que cela fonctionne en parallèle, mais en y réfléchissant, je pense savoir pourquoi: cela a amené le R local ( ce nœud, pas un autre nœud) à déboguer cette fonction, donc cela fonctionne . Si vous insérez un browser () dans la fonction puis que vous l'exportez vers le cluster, cela ne fonctionnera pas (tous les autres nœuds échoueront) car ils n'ont pas d'interactivité avec l'utilisateur. Heureux que cela ait aidé, Christoph!



0
votes

J'ajoute une réponse au cas où quelqu'un avec un problème similaire rencontrerait cela. @ r2evans a répondu à ma question initiale qui a conduit à une prise de conscience que même la résolution des problèmes ci-dessus ne me donnerait pas le résultat souhaité (voir les commentaires de sa réponse). une fonction aux combinaisons. Cela prend beaucoup de temps car le nombre de combinaisons devient énorme. Ce que nous devons faire est de diviser les combinaisons en morceaux en fonction du nombre de cœurs que vous utiliserez pour exécuter en parallèle, puis de faire les calculs dans chaque nœud uniquement sur ce morceau spécifique des combinaisons.

Solution:

cor <- detectCores()-1
cl<-makeCluster(cor,type="SOCK")

set.seed(1)
m <- 15
W <- matrix(runif(15,0,1))
#iperm <- arrangements::ipermutations(0:1, m, replace = T)
t_obs <- 5
chunk_list <- list()
for (i in 1:cor)
{
  chunk_list[i] <- i

}
chunk_size <- floor((2^m)/(cor))
chunk_size <- c(rep(chunk_size,cor-1), (2^m)-chunk_size*(cor-1))

inputs_list <- Map(list, t=list(t_obs), w=list(W), chunk_list = chunk_list, chunk_size = list(chunk_size))

#inputs <- list(W,t_obs, iperm)
#names(inputs) <- c("w", "t", "ip", "chunk_it")




perms <- function(inputs)
{
  x <- 0
  L <- 2^length(inputs$w)
  ip <- arrangements::ipermutations(0:1, m, replace = T)

  chunk_size <- floor((2^m)/(cor))
  chunk_size <- c(rep(chunk_size,cor-1), (2^m)-chunk_size*(cor-1))

 if (inputs$chunk_list !=1)
  {
    ip$getnext(sum(chunk_size[1:inputs$chunk_list-1]))

  }


  for( i in 1:chunk_size[inputs$chunk_list])
  {
    y <- ip$getnext()%*%inputs$w
    if (inputs$t >= y)
    {
      x <- x + 1
    }

  }
  return(x)
}




clusterExport(cl, c("inputs_list", "m", "cor"))
clusterEvalQ(cl, {
  library(arrangements)
})

system.time(results <- parLapply(cl, inputs_list, perms))
Reduce(`+`, results)

Ce que j'ai fait a été de diviser le nombre total de combinaisons en différents morceaux, c'est-à-dire le premier 4681 (j'ai 7 nœuds assignés à cor), le second et ainsi de suite et fait en sorte que je ne manque aucune combinaison. Ensuite, j'ai changé ma fonction d'origine pour générer les permutations dans chaque nœud, mais pour passer essentiellement à la combinaison sur laquelle il devrait commencer à calculer, donc pour le nœud 1, il commence par la première combinaison, mais pour le nœud, il commence par le 4682 et ainsi de suite. Je travaille toujours sur l'optimisation de cela car c'est actuellement seulement environ 4 fois plus rapide que de l'exécuter en parallèle, même si j'utilise 7 cœurs. Je pense que le saut dans l'option de permutation accélérera cela mais je n'ai pas encore vérifié. J'espère que cela sera utile à quelqu'un d'autre, cela accélère mon temps estimé pour exécuter (avec m = 25, et non 15) une simulation d'environ 10 jours à environ 2,5 jours.


0 commentaires

0
votes

Vous devez transmettre dplyr aux nœuds pour résoudre ce

clusterEvalQ(clust,{library (dplyr)})

Le code ci-dessus devrait résoudre votre problème.


0 commentaires