1
votes

problème d'importation de dépendances lors de l'exécution de Dataflow à partir de Google Cloud Composer

J'exécute Dataflow à partir de Google Cloud Composer, le script de flux de données contient des dépendances non standard telles que zeep, googleads. qui doivent être installés sur les nœuds de travail de flux de données, je les ai donc empaquetés avec setup.py. quand j'essaye d'exécuter ceci dans un dag, le compositeur valide les fichiers de flux de données et se plaint de Aucun nom de module Zeep, googleads . J'ai donc créé pythonvirtualenvoperator et installé toutes les dépendances non standard requises et essayé d'exécuter le travail de flux de données et il se plaignait toujours de l'importation de zeep et de googleads.

Voici ma base de code:

import zeep
import googleads

{Apache-beam-code to construct dataflow pipeline}

et mon code appelable python:

def execute_dataflow(**kwargs):
        import subprocess
        TEMPLATED_COMMAND = """
                          python main.py \
                                 --runner DataflowRunner \
                                 --project {project} \
                                 --region us-central1 \
                                 --temp_location {temp_location} \
                                 --setup_file {setup_file} \
                                 --output {output} \
                                 --project_id {project_id} 
                          """.format(**kwargs)
        process = subprocess.Popen(['/bin/bash', '-c', TEMPLATED_COMMAND])
        process.wait()
        return process.returncode

Mon fichier main.py

PULL_DATA = PythonVirtualenvOperator(
    task_id=PROCESS_TASK_ID,
    python_callable=execute_dataflow,
    op_kwargs={
        'main': 'main.py',
        'project': PROJECT,
        'temp_location': 'gs://bucket/temp',
        'setup_file': 'setup.py',
        'max_num_workers': 2,
        'output': 'gs://bucket/output',
        'project_id': PROJECT_ID},
    requirements=['google-cloud-storage==1.10.0', 'zeep==3.2.0',
                  'argparse==1.4.0', 'google-cloud-kms==0.2.1',
                  'googleads==15.0.2', 'dill'],
    python_version='2.7',
    use_dill=True,
    system_site_packages=True,
    on_failure_callback=on_failure_handler,
    on_success_callback=on_success_handler,
    dag='my-dag')

Des suggestions?


6 commentaires

Lorsque vous appelez sous-processus , vous invoquez un nouveau shell bash en dehors de virtualenv, c'est-à-dire que virtualenv n'est pas activé et que ces deps ne sont pas disponibles. VirtualEnvOperator appelle déjà le sous-processus pour vous, et il génère également un script et une cmd pour appeler ce script et un moyen de passer des arguments à ce script python. Vous pouvez utiliser ce magique virtualenv_string_args global à l'intérieur du point d'entrée du flux de données pour ajouter par programme PipelineOptions au travail de flux de données, plutôt que de passer des arguments CLI. Commencez par github.com/apache/airflow/blob/ maître / airflow / opérateurs /…


Davos, merci d'avoir élaboré ceci. pouvez-vous s'il vous plaît fournir un exemple de comment pouvons-nous utiliser string_args dans PythonVirtualenvOperator , et comment les lire en python appelable? Peu confondu avec la documentation, il dit que le type de string_args est list [str], et le séparateur doit être newline. github.com/apache/airflow < / a>


En fait, je pense que le problème est le python que vous appelez, pas le nouveau shell. L'opérateur airflow installe un virtualenv dans un répertoire temporaire, et c'est à partir de là que l'exécutable python doit être appelé. Je me demande si cela pourrait fonctionner si le sous-processus avait les bonnes variables d'environnement pour le chemin ou le chemin python qui pointait vers le répertoire temporaire virtualenv.


D'accord, les documents ne sont pas clairs. Déduit de la source: vous passez une list [str] dans le paramètre string_args de PythonVirtualenvOperator puis dans votre code python, vous avez comme par magie accès à une variable globale appelée virtualenv_string_args qui est également une liste de chaînes. Entre les deux, il copie votre liste d'origine dans un fichier délimité par une nouvelle ligne, mais il reconstruit ensuite votre code dans un script qui lit le fichier args sous forme de liste. Honnêtement, je ne sais pas pourquoi ils mentionnent que le séparateur devrait être une nouvelle ligne car c'est un détail d'implémentation interne, pas une partie de l'API.


github.com/apache/airflow/blob/... / a> il ouvre ce fichier args de nouvelle ligne, analyse les lignes et remplit la liste de chaînes virtualenv_string_args . Sur Line416, il insère votre code python. C'est compliqué, mais il n'y a probablement pas d'autre moyen de le faire fonctionner. Il utilise inspect.getsource () sur votre appelable afin d'aligner votre code. Votre fonction est en ligne, mais ensuite, il y a cet appel au sous-processus pour lancer un nouveau python, un monde dans un monde, comme ce film "création", ça me fait mal à la tête.


Aussi, quand j'ai dit "c'est de là que l'exécutable python devrait être appelé" (le répertoire temporaire), vous n'avez pas besoin de vous en préoccuper, l'opérateur le gère pour vous.


3 Réponses :


0
votes

2 commentaires

Le problème n'est pas lié à l'exécution du travail de flux de données. Pour expliquer brièvement, la machine cliente à partir de laquelle le travail de flux de données est soumis doit avoir ces dépendances non standard installées. Dans mon cas, la machine cliente est une instance de compositeur qui, par défaut, n'a pas ces dépendances installées.J'utilise donc pythonvirenvoperator et je crée un virenv sur l'instance de composer et j'ai essayé de soumettre le travail de flux de données à partir du virenv nouvellement créé. Mais la planification des tâches a échoué avec aucun module nommé (zeep et google ads) .


Ah, le problème se situe donc du côté de Cloud Composer, pas du côté de Dataflow. Je ne connais pas Cloud Composer, je ne peux donc pas vous aider.



0
votes

À l'aide d'un exemple de script de pipeline Dataflow avec import googleads, zeep , j'ai mis en place un environnement de test Composer. Le DAG est comme le vôtre et j'obtiens la même erreur. Ensuite, je fais quelques changements, pour m'assurer que les dépendances peuvent être trouvées sur les machines de travail.

Dans le DAG, j'utilise un simple PythonOperator , pas un PythonVirtualenvOperator code >. J'ai mon pipeline de flux de données et mon fichier de configuration ( main.py et setup.py ) dans un bucket Google Cloud Storage , afin que Composer puisse les trouver. Le fichier d'installation a une liste d'exigences où je dois avoir par exemple zeep et googleads. J'ai adapté un exemple de fichier de configuration à partir de ici , en changeant ceci:

with models.DAG(  'composer_sample',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    PULL_DATA = PythonOperator(
    task_id='PULL_DATA',
    python_callable=execute_dataflow,
    op_kwargs={
        'main': '/home/airflow/gcs/data/main.py',
        'project': PROJECT,
        'temp_location': 'gs://dataflow-imports-test/temp',
        'setup_file': '/home/airflow/gcs/data/setup.py',
        'max_num_workers': 2,
        'output': 'gs://dataflow-imports-test/output',
        'project_id': PROJECT_ID})
    PULL_DATA

Mon DAG est

REQUIRED_PACKAGES = [
    'google-cloud-storage==1.10.0', 'zeep==3.2.0',
'argparse==1.4.0', 'google-cloud-kms==0.2.1',
'googleads==15.0.2', 'dill'
    ]

setuptools.setup(
    name='Imports test',
    version='1',
    description='Imports test workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )

sans modification de l'appelable Python. Cependant, avec cette configuration, j'obtiens toujours l'erreur.

Étape suivante , dans la console Google Cloud Platform (GCP), je vais dans" Composer "via le menu de navigation, puis je clique sur le nom de l'environnement. Sur l'onglet "Paquets PyPI", j'ajoute zeep et googleads, et cliquez sur "soumettre". La mise à jour de l'environnement prend un certain temps, mais cela fonctionne.

Après cette étape, mon pipeline est capable d'importer les dépendances et de s'exécuter avec succès. J'ai également essayé d'exécuter le DAG avec les dépendances indiquées sur la console GCP mais pas dans les exigences de setup.py . Et le flux de travail se rompt à nouveau, mais à des endroits différents. Assurez-vous donc de les indiquer aux deux endroits.



0 commentaires

0
votes

Vous devez installer les bibliothèques dans votre environnement Cloud Composer (consultez ce lien ). Il existe un moyen de le faire dans la console, mais je trouve ces étapes plus faciles:

  1. Ouvrez votre page des environnements
  2. Sélectionnez l'environnement réel dans lequel votre Composer s'exécute
  3. Accédez à l'onglet Packages PyPI
  4. Cliquez sur modifier
  5. Ajoutez manuellement chaque ligne de vos requirements.txt
  6. Enregistrer

Vous pouvez obtenir une erreur si la version que vous avez fournie pour une bibliothèque est trop ancienne, vérifiez donc les journaux et mettez à jour les numéros, si nécessaire.


0 commentaires