मैं एडब्ल्यूएस ईएमआर क्लस्टर बनाने के लिए एयरफ्लो ईएमआर ऑपरेटरों का उपयोग कर रहा हूं जो एस 3 में निहित एक जार फ़ाइल चलाता है और फिर आउटपुट को एस 3 पर लिखता है। ऐसा लगता है कि एस 3 से जार फ़ाइल का उपयोग करके नौकरी चलाने में सक्षम है, लेकिन मैं इसे आउटपुट को एस 3 में लिखने के लिए नहीं मिल सकता। मैं इसे एडब्ल्यूएस ईएमआर सीएलआई बैश कमांड के रूप में चलाते समय एस 3 को आउटपुट लिखने में सक्षम हूं, लेकिन मुझे एयरफ्लो ईएमआर ऑपरेटरों का उपयोग करके इसे करने की ज़रूरत है। मेरे पास S3 आउटपुट निर्देशिका एयरफ्लो चरण कॉन्फ़िगरेशन और जार फ़ाइल में पर्यावरण कॉन्फ़िगरेशन दोनों में सेट है और अभी भी ऑपरेटरों को इसे लिखने के लिए नहीं मिल सकता है।

मेरे पास मेरे एयरफ्लो डीएजी के लिए कोड है:

from datetime import datetime, timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

DEFAULT_ARGS = {
    'owner': 'AIRFLOW_USER',
    'depends_on_past': False,
    'start_date':  datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

RUN_STEPS = [
    {
        "Name": "run-custom-create-emr",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit", "--deploy-mode", "cluster", "--master", "yarn", "--conf",
                "spark.yarn.submit.waitAppCompletion=false", "--class", "CLASSPATH",
                "s3://INPUT_JAR_FILE",
                "s3://OUTPUT_DIR"
            ]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    "Name": "JOB_NAME",
    "LogUri": "s3://LOG_DIR/",
    "ReleaseLabel": "emr-5.23.0",
    "Instances": {
        "Ec2KeyName": "KP_USER_NAME",
        "Ec2SubnetId": "SUBNET",
        "EmrManagedMasterSecurityGroup": "SG-ID",
        "EmrManagedSlaveSecurityGroup": "SG-ID",
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            }
        ],
        "TerminationProtected": True,
        "KeepJobFlowAliveWhenNoSteps": True,
    },
    "Applications": [
        {
            "Name": "Spark"
        },
        {
            "Name": "Ganglia"
        },
        {
            "Name": "Hadoop"
        },
        {
            "Name": "Hive"
        }
    ],
    "JobFlowRole": "ROLE_NAME",
    "ServiceRole": "ROLE_NAME",
    "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
    "EbsRootVolumeSize": 10,
    "Tags": [
        {
            "Key": "Country",
            "Value": "us"
        },
        {
            "Key": "Environment",
            "Value": "dev"
        }
    ]
}

dag = DAG(
    'AWS-EMR-JOB',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval=None
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_connection_CustomCreate',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=RUN_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

क्या किसी के पास कोई विचार है कि मैं इस समस्या को कैसे हल कर सकता हूं? किसी भी सहायता की सराहना की जाएगी।

0
mattc-7 12 सितंबर 2019, 00:04

1 उत्तर

सबसे बढ़िया उत्तर

मुझे विश्वास है कि मैंने अभी अपनी समस्या हल की है। वास्तव में सभी स्थानीय एयरफ्लो लॉग और S3 EMR लॉग में गहराई से खुदाई करने के बाद मुझे एक Hadoop मेमोरी अपवाद मिला, इसलिए मैंने EMR को चलाने के लिए कोर की संख्या में वृद्धि की और यह अब काम करने लगता है।

0
mattc-7 12 सितंबर 2019, 16:21