요약 : 외부 스크립트(셀레니움으로 제작된)를 실행시키기 위한 dag 작성.
우선 셀레니움 기능을 하는 코드 대신, 메모장을 생성하는 코드로 테스트 했다.
배경지식
- UTF-8 인코딩 : 유니코드를 사용하는 인코딩 방식으로, 세계의 거의 모든 문자 체계를 하나의 인코딩으로 표현할 수 있습니다. 따라서 다양한 언어나 특수 문자를 포함할 가능성이 있는 파일을 다룰 때는 encoding='utf-8'을 사용하는 것이 안전하고 효과적인 방법이다. 다양한 시스템과 플랫폼 간에 호환되는 방식으로 데이터를 전송하거나 저장할 수 있다.
- Base64 인코딩 : 기존 ASCII 코드는 시스템간 데이터를 전달하기에 안전하지 않다. 모든 Binary 데이터가 ASCII 코드에 포함되지 않으므로 제대로 읽지 못한다. Base64는 제어문자와 일부 특수문자를 제외한 53개의 안전한 문자만 사용하여 ASCII 문자열로 인코딩합니다. 이는 데이터가 이메일 본문, URL 파라미터, 웹 API, HTML 폼 데이터 등으로 안전하게 전송될 수 있도록 합니다. 바이너리 데이터를 안전하게 문자 기반 프로토콜로 전송해야 할 때 발생할 수 있는 문제(예: 데이터 손상, 인코딩 오류)를 방지할 수 있다.
요구기능
- 도커 컨테이너에 구축된 airflow가 외부(윈도우)에 ssh로 접속한다.
- 접속 후 윈도우내 (크롤링을 위한) 셀레니움 코드를 실행 시킨다.
- CI/CD를 위해 윈도우내 코드는 airflow의 dag에 의해 매번 생성(갱신)돼야 한다.
dag 내용
import base64
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from datetime import datetime, timedelta
import os
# 기본 DAG 설정 파라미터를 설정합니다. 이것은 DAG가 실행될 때 기본적으로 사용될 여러 설정값을 담고 있습니다.
default_args = {
'owner': 'airflow', # DAG의 소유자 이름
'depends_on_past': False, # 이전 실행의 성공 여부에 따라 현재 실행을 결정하는지 여부
'start_date': datetime(2023, 6, 12), # DAG가 시작하는 날짜
'email_on_failure': False, # 실패 시 이메일 알림 설정
'email_on_retry': False, # 재시도 시 이메일 알림 설정
'retries': 1, # 실패 시 재시도 횟수
'retry_delay': timedelta(minutes=5), # 재시도 간 대기 시간
}
# 로컬 파일 시스템에서 Python 스크립트의 내용을 읽어오는 코드입니다.
script_path = os.path.join(os.path.dirname(__file__), 'my_script.py')
with open(script_path, 'r', encoding='utf-8') as file:
script_content = file.read()
# 제한된 : ASCII 문자열로만 구성된 문자열
# 문자열은 이미 python에서 유니코드 형태로 저장돼있음, 이를 utf-8로 변환하는것
# script_content.encode('utf-8') : 문자열 -> (utf-8화된) 바이트
# base64.b64encode(...): (utf-8화된) 바이트 -> (Base64 인코딩된) 바이트
# .decode('utf-8'): (Base64 인코딩된) 바이트 -> (전송에 안전한) 문자열
encoded_script = base64.b64encode(script_content.encode('utf-8')).decode('utf-8')
# DAG를 정의합니다. DAG는 특정 작업들의 그룹이며, 이 작업들은 설정된 스케줄에 따라 실행됩니다.
with DAG(
'execute_external_script_dag_ssh',
default_args=default_args,
description='SSH를 통해 외부 스크립트를 실행하는 DAG',
schedule_interval='0 9 * * *', # 매일 오전 9시에 실행
catchup=False, # 과거 누락된 실행을 건너뜁니다.
) as dag:
# SSH를 통해 외부 시스템에 디렉토리를 만드는 작업입니다.
create_directory = SSHOperator(
task_id='create_directory',
ssh_conn_id='ssh_default',
command='if not exist C:\\script mkdir C:\\script', # Windows 조건부 디렉토리 생성 명령어
)
# 인코딩된 스크립트를 파일로 만드는 작업입니다.
create_script_file = SSHOperator(
task_id='create_script_file',
ssh_conn_id='ssh_default',
command=f'echo {encoded_script} > C:\\script\\my_script.b64',
)
# 인코딩된 스크립트 파일을 디코딩하여 실제 Python 파일로 변환하는 작업입니다.
decode_script_file = SSHOperator(
task_id='decode_script_file',
ssh_conn_id='ssh_default',
command='del C:\\script\\my_script.py & certutil -decode C:\\script\\my_script.b64 C:\\script\\my_script.py',
)
# 디코딩된 Python 스크립트를 실행하는 작업입니다.
run_external_script = SSHOperator(
task_id='run_external_script',
ssh_conn_id='ssh_default',
command='python C:\\script\\my_script.py', # Windows 경로에서 Python 스크립트 실행 명령어
)
# 작업들의 실행 순서를 정의합니다.
create_directory >> create_script_file >> decode_script_file >> run_external_script
해결한 자잘한 오류
- c 드라이브 내 script 폴더가 이미 생성돼있을때, 생성되지 않는 오류 발생
-> if not exist로 폴더가 없을때만 생성되게 바꿈 - 전송 단계에서 인코딩 문제로 코드가 이상한 문자로 제작됨
->base64로 (전송에 안전한) 문자열로 변환
'다용도 로컬 파이프라인 구성' 카테고리의 다른 글
Poetry + 도커컨테이너 모듈 의존성 간편화2 (2) | 2024.07.16 |
---|---|
airflow.cfg 상세 분석 (0) | 2024.07.01 |
도커 컨테이너에서 외부 셀레니움 실행 3 (0) | 2024.06.23 |
도커 컨테이너에서 외부 셀레니움 실행 1 (0) | 2024.06.13 |
Poetry + 도커컨테이너 모듈 의존성 간편화1 (1) | 2024.06.12 |