Je suis en train de créer un pipeline de transformation de données et de formation sur Azure Machine Leaning Service. Je voudrais enregistrer mon transformateur installé (par exemple tf-idf) dans le blob, afin que mon pipeline de prédiction puisse y accéder plus tard.
transformed_data = PipelineData("transformed_data", datastore = default_datastore, output_path_on_compute="my_project/tfidf") step_tfidf = PythonScriptStep(name = "tfidf_step", script_name = "transform.py", arguments = ['--input_data', blob_train_data, '--output_folder', transformed_data], inputs = [blob_train_data], outputs = [transformed_data], compute_target = aml_compute, source_directory = project_folder, runconfig = run_config, allow_reuse = False)
Le code ci-dessus enregistre le transformateur dans le dossier d'une exécution en cours, qui est généré dynamiquement à chaque exécution.
Je souhaite enregistrer le transformateur dans un emplacement fixe sur blob, afin de pouvoir y accéder plus tard, lors de l'appel d'un pipeline de prédiction.
J'ai essayé d'utiliser une instance de DataReference
comme sortie PythonScriptStep
, mais cela entraîne une erreur:
ValueError: Type de sortie inattendu:
C'est parce que PythonScriptStep
n'accepte que PipelineData code> ou des objets
OutputPortBinding
en tant que sorties.
Comment pourrais-je enregistrer mon transformateur adapté pour qu'il soit plus tard accessible par n'importe quel processus aribitraly (par exemple mon pipeline de prédiction)?
p>
3 Réponses :
Ce n'est probablement pas assez flexible pour vos besoins (je ne l'ai pas encore testé), mais si vous utilisez scikit-learn, une possibilité est d'inclure l'étape tf-idf / transformation dans un scikit-learn Pipeline et enregistrez-le dans votre espace de travail.
Votre script d'entraînement contiendrait donc:
model_path = Model.get_model_path('my_pipeline') # deserialize the model file back into a sklearn model model = joblib.load(model_path)
et votre script de soumission d'expérience contiendrait p >
run = exp.submit(src) run.wait_for_completion(show_output = True) model = run.register_model(model_name='my_pipeline', model_path='outputs/model.pkl')
Ensuite, vous pouvez utiliser le "modèle" enregistré et le déployer en tant que service en tant que expliqué dans la documentation , en le chargeant dans un script de notation via
pipeline = Pipeline([ ('vectorizer', TfidfVectorizer(stop_words = list(text.ENGLISH_STOP_WORDS))), ('classifier', SGDClassifier() ]) pipeline.fit(train[label].values, train[pred_label].values) # Serialize the pipeline joblib.dump(value=pipeline, filename='outputs/model.pkl')
Merci, @Davide. Je pense que c'est une excellente idée! Je vais le tester.
Une autre solution consiste à transmettre DataReference
comme entrée de votre PythonScriptStep
.
Ensuite, dans transform.py
, vous pouvez lire cette DataReference
comme argument de ligne de commande.
Vous pouvez l'analyser et l'utiliser comme n'importe quel chemin normal dans lequel enregistrer votre vectoriseur.
Par exemple. vous pouvez:
import argparse import joblib as jbl import os from sklearn.feature_extraction.text import TfidfVectorizer parser = argparse.ArgumentParser() parser.add_argument('--transformer_path', dest="transformer_path", required=True) args = parser.parse_args() tfidf = ### HERE CREATE AND TRAIN YOUR VECTORIZER ### vect_filename = os.path.join(args.transformer_path, 'my_vectorizer.jbl')
Ensuite, dans votre script ( transform.py
dans l'exemple ci-dessus), vous pouvez par exemple:
step_tfidf = PythonScriptStep(name = "tfidf_step", script_name = "transform.py", arguments = ['--input_data', blob_train_data, '--output_folder', transformed_data, '--transformer_path', trained_transformer_path], inputs = [blob_train_data, trained_transformer_path], outputs = [transformed_data], compute_target = aml_compute, source_directory = project_folder, runconfig = run_config, allow_reuse = False)
EXTRA: La troisième façon serait simplement d'enregistrer le vectoriseur comme un autre modèle dans votre espace de travail. Vous pouvez ensuite l'utiliser exactement comme n'importe quel autre modèle enregistré. (Bien que cette option n'implique pas d'écriture explicite dans le blob - comme spécifié dans la question ci-dessus)
Une autre option sera d'utiliser DataTransferStep
et de l'utiliser pour copier la sortie vers un "emplacement connu". Ce bloc-notes contient des exemples d'utilisation de DataTransferStep pour copier des données depuis et vers diverses banques de données prises en charge.
from azureml.data.data_reference import DataReference from azureml.exceptions import ComputeTargetException from azureml.core.compute import ComputeTarget, DataFactoryCompute from azureml.pipeline.steps import DataTransferStep blob_datastore = Datastore.get(ws, "workspaceblobstore") blob_data_ref = DataReference( datastore=blob_datastore, data_reference_name="knownloaction", path_on_datastore="knownloaction") data_factory_name = 'adftest' def get_or_create_data_factory(workspace, factory_name): try: return DataFactoryCompute(workspace, factory_name) except ComputeTargetException as e: if 'ComputeTargetNotFound' in e.message: print('Data factory not found, creating...') provisioning_config = DataFactoryCompute.provisioning_configuration() data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config) data_factory.wait_for_completion() return data_factory else: raise e data_factory_compute = get_or_create_data_factory(ws, data_factory_name) # Assuming output data is your output from the step that you want to copy transfer_to_known_location = DataTransferStep( name="transfer_to_known_location", source_data_reference=[output_data], destination_data_reference=blob_data_ref, compute_target=data_factory_compute ) from azureml.pipeline.core import Pipeline from azureml.core import Workspace, Experiment pipeline_01 = Pipeline( description="transfer_to_known_location", workspace=ws, steps=[transfer_to_known_location]) pipeline_run_01 = Experiment(ws, "transfer_to_known_location").submit(pipeline_01) pipeline_run_01.wait_for_completion()