Hadoop(HDFS)을 Docker로 구축하고, Python(Flask)을 사용하여 HDFS에 파일을 저장, 조회, 삭제하는 CRUD API를 만드는 과정

1단계: Docker로 Hadoop 설치 (docker-compose)

가장 널리 사용되는 bde2020/hadoop 이미지를 기반으로 Namenode와 Datanode만 실행하는 경량화된 구성을 사용합니다. (MapReduce/YARN은 파일 CRUD에 필수적이지 않으므로 제외했습니다.)

  1. 프로젝트 폴더를 만들고 docker-compose.yml 파일을 생성합니다.

Yaml

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    container_name: namenode
    restart: always
    ports:
      - 9870:9870 # WebHDFS 및 UI 포트
      - 9000:9000 # IPC 포트
    volumes:
      - namenode_data:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
      - HDFS_CONF_dfs_permissions_enabled=false # 권한 체크 비활성화 (테스트용)

  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
    container_name: datanode
    restart: always
    volumes:
      - datanode_data:/hadoop/dfs/data
    environment:
      - SERVICE_PRECONDITION=namenode:9870
    depends_on:
      - namenode

volumes:
  namenode_data:
  datanode_data:
  1. 컨테이너 실행:

    Bash

    docker-compose up -d
    • 실행 후 브라우저에서 http://localhost:9870에 접속하여 Hadoop UI가 뜨는지 확인합니다.


2단계: Python API 서버 구축 (Flask)

Hadoop과 통신하기 위해 Python의 hdfs 라이브러리(WebHDFS 클라이언트)와 웹 프레임워크인 Flask를 사용합니다.

  1. 필요한 라이브러리 설치

    Bash

    pip install flask hdfs
  2. API 코드 작성 (app.py)

Python

from flask import Flask, request, jsonify, send_file
from hdfs import InsecureClient
import io

app = Flask(__name__)

# Hadoop WebHDFS 연결 설정
# Docker가 로컬 9870포트로 포워딩 되어 있으므로 localhost:9870 사용
client = InsecureClient('http://localhost:9870', user='root')

@app.route('/')
def index():
    return "Hadoop HDFS CRUD API Server Running..."

# [R] List: 특정 디렉토리의 파일 목록 조회
@app.route('/files', methods=['GET'])
def list_files():
    path = request.args.get('path', '/') # 기본값은 루트('/')
    try:
        files = client.list(path)
        return jsonify({"path": path, "files": files})
    except Exception as e:
        return jsonify({"error": str(e)}), 404

# [R] Read: 파일 내용 읽기 (다운로드)
@app.route('/read', methods=['GET'])
def read_file():
    path = request.args.get('path')
    if not path:
        return jsonify({"error": "Path parameter is required"}), 400
    
    try:
        # HDFS에서 파일을 읽어옴
        with client.read(path) as reader:
            content = reader.read()
            return send_file(
                io.BytesIO(content),
                as_attachment=True,
                download_name=path.split('/')[-1],
                mimetype='application/octet-stream'
            )
    except Exception as e:
        return jsonify({"error": str(e)}), 404

# [C] Create: 파일 업로드
@app.route('/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({"error": "No file part"}), 400
    
    file = request.files['file']
    target_path = request.form.get('path', '/') # 저장할 HDFS 경로 (기본: 루트)
    
    if file.filename == '':
        return jsonify({"error": "No selected file"}), 400

    full_path = f"{target_path.rstrip('/')}/{file.filename}"

    try:
        # write() 함수는 data, encoding 등을 받습니다. 
        # Flask FileStorage 객체의 스트림을 바로 넘기거나 read()하여 넘깁니다.
        client.write(full_path, file.read(), overwrite=True)
        return jsonify({"message": "Upload success", "hdfs_path": full_path}), 201
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# [U] Update: 디렉토리 생성 (간단한 Update 예시)
# HDFS에서 파일 수정(Append)은 까다로우므로, 여기서는 디렉토리 생성(Mkdir)으로 대체합니다.
@app.route('/mkdir', methods=['POST'])
def make_directory():
    data = request.json
    path = data.get('path')
    
    if not path:
        return jsonify({"error": "Path is required"}), 400

    try:
        client.makedirs(path)
        return jsonify({"message": f"Directory {path} created"}), 201
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# [D] Delete: 파일 또는 디렉토리 삭제
@app.route('/delete', methods=['DELETE'])
def delete_file():
    path = request.args.get('path')
    if not path:
        return jsonify({"error": "Path parameter is required"}), 400

    try:
        # recursive=True는 디렉토리 삭제 시 내부 파일까지 삭제
        result = client.delete(path, recursive=True)
        if result:
            return jsonify({"message": f"Deleted {path}"}), 200
        else:
            return jsonify({"error": "File not found or delete failed"}), 404
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, port=5000)

3단계: 테스트 방법

Hadoop(HDFS)을 Docker로 구축하고, Python(Flask)을 사용하여 HDFS에 파일을 저장, 조회, 삭제하는 CRUD API를 만드는 과정

API 서버를 실행합니다.

Bash

python app.py

이제 curl 명령어 또는 Postman을 사용하여 테스트할 수 있습니다.

1. 디렉토리 생성 (Create/Update)

하둡 루트에 my_data라는 폴더를 만듭니다.

Bash

curl -X POST http://localhost:5000/mkdir \
     -H "Content-Type: application/json" \
     -d '{"path": "/my_data"}'

2. 파일 업로드 (Create)

로컬에 있는 test.txt 파일을 HDFS의 /my_data 폴더에 업로드합니다.
(먼저 echo “hello hadoop” > test.txt로 파일을 만드세요.)

Bash

curl -X POST http://localhost:5000/upload \
     -F "file=@test.txt" \
     -F "path=/my_data"

3. 파일 목록 조회 (Read List)

/my_data 폴더 안의 파일 목록을 봅니다.

Bash

curl "http://localhost:5000/files?path=/my_data"

4. 파일 내용 읽기 (Read Content)

업로드한 파일 내용을 확인합니다.

Bash

curl "http://localhost:5000/read?path=/my_data/test.txt"

5. 파일 삭제 (Delete)

파일을 삭제합니다.

Bash

curl -X DELETE "http://localhost:5000/delete?path=/my_data/test.txt"

요약

  1. Docker Compose를 이용해 Hadoop(Namenode, Datanode)을 실행하고 9870 포트를 엽니다.

  2. Python Flask와 hdfs 라이브러리를 사용해 WebHDFS API(9870 포트)와 통신하는 미들웨어 서버를 만듭니다.

  3. 클라이언트는 Flask API를 통해 HDFS 내부 구조를 몰라도 파일을 제어할 수 있습니다.