5
votes

AWS - Fonctions Step, utilisez l'entrée d'exécution dans un TuningStep

J'ai écrit un workflow de fonctions d'étape AWS simple en une seule étape:

... 
"DataSource": {
   "S3DataSource": {
   "S3DataType": "S3Prefix",
   "S3Uri": "s3://my-bucket/<stepfunctions.inputs.placeholders.ExecutionInput object at 0x12261f7d0>/data/",
    "S3DataDistributionType": "FullyReplicated"
    }
},
...

Mon objectif est que l'entrée de train soit extraite du JSON d'exécution fourni au moment de l'exécution. Lorsque j'exécute le flux de travail (à partir de la console des fonctions d'étape), en fournissant le JSON {"app_id": "My App ID"} l'étape de réglage n'obtient pas les bonnes données, à la place, elle obtient une représentation to_string des stepfunctions.inputs.placeholders.ExecutionInput . De plus, en regardant l'ASL généré, je peux voir que l'entrée d'exécution a été rendue sous forme de chaîne:

from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import Chain, TuningStep
from stepfunctions.workflow import Workflow
import train_utils


def main():
    workflow_execution_role = 'arn:aws:iam::MY ARN'
    execution_input = ExecutionInput(schema={
        'app_id': str
    })
    estimator = train_utils.get_estimator()
    tuner = train_utils.get_tuner(estimator)

    tuning_step = TuningStep(state_id="HP Tuning", tuner=tuner, data={
        'train': f's3://my-bucket/{execution_input["app_id"]}/data/'},
                             wait_for_completion=True,
                             job_name='HP-Tuning')

    workflow_definition = Chain([
        tuning_step
    ])

    workflow = Workflow(
        name='HP-Tuning',
        definition=workflow_definition,
        role=workflow_execution_role,
        execution_input=execution_input
    )
    workflow.create()


if __name__ == '__main__':
    main()

Qu'est-ce que je fais mal?

Mise à jour: comme mentionné par @yoodan, le SDK est probablement en retard, je vais donc devoir modifier la définition avant d'appeler create. Je vois qu'il existe un moyen de revoir la définition avant d'appeler create, mais puis-je modifier la définition du graphe? Comment?


1 commentaires

Si vous avez la définition, vous pouvez utiliser Boto3 pour appeler CreateStateMachine avec votre définition: boto3.amazonaws.com/v1/documentation/api/latest/reference / ...


3 Réponses :


2
votes

Le SDK python pour les fonctions d'étape génère le code correspondant, nous avons besoin d'une concaténation / format de chaîne intégré dans le langage Amazon States pour accomplir ce que vous désirez.

Récemment, en août 2020, Amazon States Language a introduit des fonctions intégrées telles que le format de chaîne dans ses spécifications de langue. https://states-language.net/#appendix-b

Malheureusement, le SDK python n'est pas à jour et ne prend pas en charge les nouvelles modifications.

Pour contourner le problème, modifiez peut-être manuellement la définition avant d'appeler le workflow create?


1 commentaires

Merci @yoodan, j'ai essayé les fonctions intrinsèques que vous avez mentionnées et cela fonctionne très bien si je le fais manuellement. Qu'entendez-vous par modify the definition before calling workflow create? ne crée pas génère la définition?



0
votes

votre définition de la fonction step est bonne et il semble que cela devrait fonctionner.
il n'est pas clair d'après votre exemple de code où l'exécution se produit réellement (vous l'avez déclaré directement à partir de la console), ce qui peut conduire à plusieurs options pouvant causer le problème, et je pense que c'est la source du problème.
veuillez fournir plus d'informations à ce sujet.

exportez-vous d'une manière ou d'une autre votre objet Workflow créé pour qu'il puisse être exécuté?
il semble que quelque chose manque ... pour vérifier la cohérence, ajoutez ce qui suit à votre fonction main :

workflow.execute(inputs={"app_id": "My App ID"})

et vérifiez à nouveau les journaux


0 commentaires

1
votes

Il semble que ce que vous recherchez n'est pas pris en charge par le SDK. https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/79

Vous pouvez cependant modifier la définition avant de créer la machine à états. Voici une fonction qui va itérer la définition dict et remplacer un espace réservé entouré de {{PH}} par la bonne syntaxe des fonctions intrinsèques, par exemple:

workflow_definition = get_updated_definition(workflow.definition.to_dict())
create_state_machine(json.dumps(workflow_definition)) #<-- implement using boto3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.create_state_machine)

def get_updated_definition(data):
    if isinstance(data, dict):
        for k, v in data.copy().items():
            if isinstance(v, dict):  # For DICT
                data[k] = get_updated_definition(v)
            elif isinstance(v, list):  # For LIST
                data[k] = [get_updated_definition(i) for i in v]
            elif isinstance(v, str) and re.search(r'{{([a-z_]+)}}', v):  # Update Key-Value
                # data.pop(k)
                # OR
                del data[k]
                keys = re.findall(r'{{([a-z_]+)}}', v)
                data[f"{k}.$"] = f"States.Format('{re.sub(r'{{[a-z_]+}}', '{}', v)}',{','.join(['$.'+k for k in keys])})"

    return data

usage:

s3://my-bucket/{{app_id}}/data/


0 commentaires