J'essaie de trouver le meilleur moyen efficace de lire un fichier csv (~ 1M ligne). Chaque ligne contient un lien HTTP vers une image que je dois télécharger.
Voici mon code actuel utilisant des pools de nœuds de calcul:
func worker(queue chan []string, worknumber int, done, ks chan bool) {
for true {
select {
case url := <-queue:
fmt.Println("doing work!", url, "worknumber", worknumber)
processData(url) // HTTP download
done <- true
case <-ks:
fmt.Println("worker halted, number", worknumber)
return
}
}
}
func main() {
start := time.Now()
flag.Parse()
fmt.Print(strings.Join(flag.Args(), "\n"))
if *filename == "REQUIRED" {
return
}
csvfile, err := os.Open(*filename)
if err != nil {
fmt.Println(err)
return
}
count, _ := lineCounter(csvfile)
fmt.Printf("Total count: %d\n", count)
csvfile.Seek(0, 0)
defer csvfile.Close()
//bar := pb.StartNew(count)
bar := progressbar.NewOptions(count)
bar.RenderBlank()
reader := csv.NewReader(csvfile)
//channel for terminating the workers
killsignal := make(chan bool)
//queue of jobs
q := make(chan []string)
// done channel takes the result of the job
done := make(chan bool)
numberOfWorkers := *numChannels
for i := 0; i < numberOfWorkers; i++ {
go worker(q, i, done, killsignal)
}
i := 0
for {
record, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
fmt.Println(err)
return
}
i++
go func(r []string, i int) {
q <- r
bar.Add(1)
}(record, i)
}
// a deadlock occurs if c >= numberOfJobs
for c := 0; c < count; c++ {
<-done
}
fmt.Println("finished")
// cleaning workers
close(killsignal)
time.Sleep(2 * time.Second)
fmt.Printf("\n%2fs", time.Since(start).Seconds())
}
Mon problème ici est qu'il ouvre un beaucoup de goroutines, utilisez toute la mémoire et plantez.
Quelle serait la meilleure façon de la limiter?
3 Réponses :
Vous créez une nouvelle goroutine pour chaque ligne du fichier. Voilà pourquoi. Il n'y a aucune raison de faire cela, si vous avez déjà les travailleurs dont vous avez besoin.
Donc, en bref, changez ceci:
q <- record
bar.Add(1)
en ceci:
go func(r []string, i int) {
q <- r
bar.Add(1)
}(record, i)
J'ai essayé mais là, il semble entrer dans une impasse. Il commence à traiter numberOfWorkers nombre de lignes, puis se bloque.
se produit parce que rien ne lit la sortie de vos nœuds de calcul tant que vous n'avez pas entièrement traité l'entrée.
Pouvez-vous m'aider à mieux comprendre votre commentaire?
Vos employés écrivent sur le canal done , mais ce n'est pas lu avant la fin du programme. C'est ce qui conduit à une impasse.
J'ai rayé la barre de progression car je ne voulais pas m'en préoccuper, mais dans l'ensemble, c'est plus proche de ce que vous recherchez.
Elle ne gère pas vraiment les erreurs, elles échouent simplement dans un état fatal.
/ p>
J'ai ajouté la prise en charge du contexte et de l'annulation.
Vous voudrez peut-être vérifier https://godoc.org/golang.org/x/sync/ errgroup # Group.Go
En guise de recommandation générale, vous devez apprendre les modèles de golang et leur utilisation.
Il est évident que vous n'avez pas suffisamment travaillé, ou que vous êtes en train d'apprendre.
Ce n'est pas du tout le programme le plus rapide, mais il fait le travail.
Ce n'est qu'un brouillon pour vous remettre sur une meilleure direction.
package main
import (
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"sync"
"time"
)
func worker(ctx context.Context, dst chan string, src chan []string) {
for {
select {
case url, ok := <-src: // you must check for readable state of the channel.
if !ok {
return
}
dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
case <-ctx.Done(): // if the context is cancelled, quit.
return
}
}
}
func main() {
// create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// that cancels at ctrl+C
go onSignal(os.Interrupt, cancel)
// parse command line arguments
var filename string
var numberOfWorkers int
flag.StringVar(&filename, "filename", "", "src file")
flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
flag.Parse()
// check arguments
if filename == "" {
log.Fatal("filename required")
}
start := time.Now()
csvfile, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer csvfile.Close()
reader := csv.NewReader(csvfile)
// create the pair of input/output channels for the controller=>workers com.
src := make(chan []string)
out := make(chan string)
// use a waitgroup to manage synchronization
var wg sync.WaitGroup
// declare the workers
for i := 0; i < numberOfWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(ctx, out, src)
}()
}
// read the csv and write it to src
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
src <- record // you might select on ctx.Done().
}
close(src) // close src to signal workers that no more job are incoming.
}()
// wait for worker group to finish and close out
go func() {
wg.Wait() // wait for writers to quit.
close(out) // when you close(out) it breaks the below loop.
}()
// drain the output
for res := range out {
fmt.Println(res)
}
fmt.Printf("\n%2fs", time.Since(start).Seconds())
}
func onSignal(s os.Signal, h func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, s)
<-c
h()
}
canal mis en mémoire tampon peut vous aider à limiter les goroutines
var taskPipe = make(chan interface{}, 5)
func main(){
go func() {
taskPipe <- nil
sleep
}()
}
func sleep() {
time.Sleep(time.Second * 5)
<- taskPipe
}
Avez-vous pensé à
bufio.NewScanner (fichier), il n'utilisera pas trop de mémoire.votre boucle sur les lignes cvs devrait être dans des routines différentes. la routine principale devrait être responsable de vider la sortie des travailleurs et d'attendre la fin ou l'erreur pour quitter.