2
votes

Traitez les gros fichiers csv et limitez les goroutines

J'essaie de trouver le meilleur moyen efficace de lire un fichier csv (~ 1M ligne). Chaque ligne contient un lien HTTP vers une image que je dois télécharger.

Voici mon code actuel utilisant des pools de nœuds de calcul:

func worker(queue chan []string, worknumber int, done, ks chan bool) {
    for true {
        select {
        case url := <-queue:
            fmt.Println("doing work!", url, "worknumber", worknumber)
            processData(url) // HTTP download
            done <- true
        case <-ks:
            fmt.Println("worker halted, number", worknumber)
            return
        }
    }
}

func main() {
    start := time.Now()
    flag.Parse()
    fmt.Print(strings.Join(flag.Args(), "\n"))
    if *filename == "REQUIRED" {
        return
    }

    csvfile, err := os.Open(*filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    count, _ := lineCounter(csvfile)
    fmt.Printf("Total count: %d\n", count)
    csvfile.Seek(0, 0)

    defer csvfile.Close()

    //bar := pb.StartNew(count)
    bar := progressbar.NewOptions(count)
    bar.RenderBlank()

    reader := csv.NewReader(csvfile)

    //channel for terminating the workers
    killsignal := make(chan bool)

    //queue of jobs
    q := make(chan []string)
    // done channel takes the result of the job
    done := make(chan bool)

    numberOfWorkers := *numChannels
    for i := 0; i < numberOfWorkers; i++ {
        go worker(q, i, done, killsignal)
    }

    i := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        } else if err != nil {
            fmt.Println(err)
            return
        }
        i++

        go func(r []string, i int) {
            q <- r
            bar.Add(1)
        }(record, i)
    }

    // a deadlock occurs if c >= numberOfJobs
    for c := 0; c < count; c++ {
        <-done
    }

    fmt.Println("finished")

    // cleaning workers
    close(killsignal)
    time.Sleep(2 * time.Second)

    fmt.Printf("\n%2fs", time.Since(start).Seconds())
}

Mon problème ici est qu'il ouvre un beaucoup de goroutines, utilisez toute la mémoire et plantez.

Quelle serait la meilleure façon de la limiter?


2 commentaires

Avez-vous pensé à bufio.NewScanner (fichier) , il n'utilisera pas trop de mémoire.


votre boucle sur les lignes cvs devrait être dans des routines différentes. la routine principale devrait être responsable de vider la sortie des travailleurs et d'attendre la fin ou l'erreur pour quitter.


3 Réponses :


4
votes

Vous créez une nouvelle goroutine pour chaque ligne du fichier. Voilà pourquoi. Il n'y a aucune raison de faire cela, si vous avez déjà les travailleurs dont vous avez besoin.

Donc, en bref, changez ceci:

    q <- record
    bar.Add(1)

en ceci:

    go func(r []string, i int) {
        q <- r
        bar.Add(1)
    }(record, i)


4 commentaires

J'ai essayé mais là, il semble entrer dans une impasse. Il commence à traiter numberOfWorkers nombre de lignes, puis se bloque.


se produit parce que rien ne lit la sortie de vos nœuds de calcul tant que vous n'avez pas entièrement traité l'entrée.


Pouvez-vous m'aider à mieux comprendre votre commentaire?


Vos employés écrivent sur le canal done , mais ce n'est pas lu avant la fin du programme. C'est ce qui conduit à une impasse.



3
votes

J'ai rayé la barre de progression car je ne voulais pas m'en préoccuper, mais dans l'ensemble, c'est plus proche de ce que vous recherchez.

Elle ne gère pas vraiment les erreurs, elles échouent simplement dans un état fatal.

/ p>

J'ai ajouté la prise en charge du contexte et de l'annulation.

Vous voudrez peut-être vérifier https://godoc.org/golang.org/x/sync/ errgroup # Group.Go

En guise de recommandation générale, vous devez apprendre les modèles de golang et leur utilisation.

Il est évident que vous n'avez pas suffisamment travaillé, ou que vous êtes en train d'apprendre.

Ce n'est pas du tout le programme le plus rapide, mais il fait le travail.

Ce n'est qu'un brouillon pour vous remettre sur une meilleure direction.

package main

import (
    "context"
    "encoding/csv"
    "flag"
    "fmt"
    "io"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
)

func worker(ctx context.Context, dst chan string, src chan []string) {
    for {
        select {
        case url, ok := <-src: // you must check for readable state of the channel.
            if !ok {
                return
            }
            dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
        case <-ctx.Done(): // if the context is cancelled, quit.
            return
        }
    }
}

func main() {

    // create a context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // that cancels at ctrl+C
    go onSignal(os.Interrupt, cancel)

    // parse command line arguments
    var filename string
    var numberOfWorkers int
    flag.StringVar(&filename, "filename", "", "src file")
    flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
    flag.Parse()

    // check arguments
    if filename == "" {
        log.Fatal("filename required")
    }

    start := time.Now()

    csvfile, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer csvfile.Close()

    reader := csv.NewReader(csvfile)

    // create the pair of input/output channels for the controller=>workers com.
    src := make(chan []string)
    out := make(chan string)

    // use a waitgroup to manage synchronization
    var wg sync.WaitGroup

    // declare the workers
    for i := 0; i < numberOfWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker(ctx, out, src)
        }()
    }

    // read the csv and write it to src
    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                break
            } else if err != nil {
                log.Fatal(err)
            }
            src <- record // you might select on ctx.Done().
        }
        close(src) // close src to signal workers that no more job are incoming.
    }()

    // wait for worker group to finish and close out
    go func() {
        wg.Wait() // wait for writers to quit.
        close(out) // when you close(out) it breaks the below loop.
    }()

    // drain the output
    for res := range out {
        fmt.Println(res)
    }

    fmt.Printf("\n%2fs", time.Since(start).Seconds())
}

func onSignal(s os.Signal, h func()) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, s)
    <-c
    h()
}


0 commentaires

0
votes

canal mis en mémoire tampon peut vous aider à limiter les goroutines

var taskPipe = make(chan interface{}, 5)

func main(){
    go func() {
        taskPipe <- nil
        sleep
    }()
}

func sleep() {
    time.Sleep(time.Second * 5)
    <- taskPipe
}


0 commentaires