읽기 전

  • 불필요한 코드나 잘못 작성된 내용에 대한 지적은 언제나 환영합니다.
  • 개인적으로 사용해보면서 배운 점을 정리한 글입니다.

사이드 프로젝트에서 python 기반으로 Lambda 함수를 기능별로 나눠서 관리하던 중 새롭게 주 기능을 추가하게 됨에 따라 GraphQL로 단일 Endpoint를 구성해 관리하자는 이야기가 있어 그에 관해 공부한 내용을 정리하려 합니다.

API 구조

Client(Android) - API gateway - Lambda - RDS

AWS_GraphQL_Lambda_01_01

모바일 앱에서 graphql query를 요청한다고 가정하고 서버 없이 serverless 구조에 api를 구현해 요청이 발생할 때마다 작업을 진행하는 방향으로 구현할 예정이다. 앞서 API gateway - Lambda는 Serverless | AWS API Gateway + Lambda + DynamoDB를 활용한 REST API 구축 #002 일부에서 설명을 진행했고 Lambda - RDS는 AWS Lambda에서 RDS(PostgreSQL) 연동하기에서 다루었으니 Lambda와 Android를 중점적으로 정리하려 한다.

AWS RDS(PostgreSQL) 생성

AWS RDS DB 생성 및 외부 연결하기를 참고하여 AWS RDS를 생성한다.

Graphql in Python - graphene module

GraphQL은 별도의 엔진으로 구동되지 않고 단지 쿼리 언어라고 이해하면 되겠다. GraphQL의 장점으로 기능별로 분화되어 있던 API를 한 군데서 모두 처리할 수 있음을 대표적으로 들고 있지만 REST API 역시 파라미터를 잘 설계하면 1개의 API에서 모든 기능을 동작하게 작성할 수 있다. 다만, 엄청나게 잘 작성하지 않으면 결국 꼬이게 될 위험이 많다. 그러므로 많은 Endpoint url를 관리하기 버겁다면 단일 Endpoint에서 동작하는 GraphQL API 채택도 좋은 방법이다. 많은 API가 현재 node.js로 작성되고 있는데 현재 진행 중인 사이드 프로젝트의 Lambda는 python기반으로 동작하므로 python을 채택했다. 언어 별 GraphQL 지원 목록에서 각 언어에 해당하는 GraphQL module을 볼 수 있다. python에선 Graphene module이 제일 유명하며 이 포스팅 역시 Graphene을 채택했다. GraphQL 문법에 관한 내용은 공식 레퍼런스에 정리가 너무 잘 되어 있어서 생략하기로 했다.

layer에 필요한 module 업로드

RDS(PostgreSQL)에 데이터를 저장할 예정이므로 python에서 접근할 수 있게 해주는 psycopg2-binary module과 GraphQL 기능을 사용하게 해주는 graphene module을 poetry add한 뒤 requirements.txt로 poetry로 추가했던 module과 의존성 정보를 추출한다. 이후 layer에 올리기 위해 특정 경로에 /python/python 폴더를 생성하고 pip install -r requirements.txt -t [상대 경로]/python/python을 실행하여 특정 폴더에 module들을 저장한다. 그리고 상위 python 폴더를 압축한 뒤 AWS Lambda Layer에 업로드해서 개발 환경을 구축한다.

Poetry로 Python module 추가한 뒤 requirements.txt 추출 및 pip install까지의 과정은 이전 포스팅 Poetry로 python module 관리하기에 정리하였다.

환경 변수 및 권한, VPC 설정

DB 인스턴스 호출을 위해 인스턴스 정보를 환경변수에 입력하고 IAM에서 API로 동작할 Lambda의 역할에 VPCExecution 권한을 추가해줘야 VPC 설정이 가능하다. 변수 입력과 권한 설정이 끝나면 Lambda 콘솔에서 같은 VPC로 맞춰준다. 관련 내용은 AWS Lambda에서 RDS(PostgreSQL) 연동하기에서 다뤘었다.

resolver, schema, 내부 db 호출 module 작성

Lambda 함수 디렉토리 구조는 아래와 같다. 지금은 예시용으로 test.py 하나만 작성했지만 이후 user라던지 post라던지 별개의 resolver와 schema 정의가 필요하면 각각에 대해 폴더로 구분하거나 파일로 구분한 뒤 한 개의 통합 Query와 Mutation class로 관리할 수 있도록 _init_.py를 생성해뒀다.

[Lambda 함수 이름]
├─ resolver
│   └ util
│   │ └ db.py
│   └ __init__.py # from . import test
│   └ test.py
├─ schema.py
│   └ __init__.py # from . import test
│   └ test.py
└─ lambda_function.py

db.py 코드

import inspect
import psycopg2
import os
import re

def execute(query):
    with psycopg2.connect(host=os.environ["DB_HOST"], user=os.environ["DB_USER"],
    dbname=os.environ["DB_NAME"], password=os.environ["DB_PASSWORD"]) as conn:
        with conn.cursor() as cur:
            try:
                cur.execute(query) # 쿼리 실행
                # _로 구분된 예약어 중복 제거
                cols = [des.name.rstrip('_') for des in cur.description]
                rows = cur.fetchall() # select 쿼리 실행 시 결과 조회
                # col=[a,b,c], row=[(1,2,3),(4,5,6)]를 [{a:1, b:2, c:3}, ...]로 매핑
                res = [dict(zip(cols, row)) for row in rows]
                return res
            except Exception as e:
                print(str(e))
                cur.close()

def sanitize(func):
    """
    db 인자를 다루는 함수에서 사용해야 하는 decorator
    Args:
        func: db 인자를 다루는 함수
         - func함수는 dict 형태를 인자로 받아야 함
    Returns: SQL escaping된 인자들로 치환된 func
    """
    def wrapper(*args, **kwargs):
        # default 명시된 인자 목록 확인
        default_params = (
            (k, v.default)
            for k, v in inspect.signature(func).parameters.items()
            if v.default is not inspect.Parameter.empty
        )
        # default 명시된 인자에 대해 값을 받지 못하면 default값 입력
        for k, v in default_params:
            if k not in kwargs:
                kwargs[k] = v
        """
        python의 None을 'NULL'로 치환
        python방식의 quoting 포맷을 SQL 방식으로 치환
        SQL Injection 공격 대비 특수문자 제거
        이후 새로운 인자가 반영된 함수 반환
        """
        for k, v in kwargs.items():
            if v is None:
                kwargs[k] = 'NULL'
            elif isinstance(v, str):
                v = re.sub('[\s\t\'\/\\\;=&+!#$%*"]', '', v)
                v = v.replace("'", "''")
                kwargs[k] = "'" + v + "'"
        return func(*args, **kwargs)
    return wrapper

schema/test.py 코드

graphql 문법대로 입력을 받고 출력하기 위해 schema를 정의해야 한다. 현재는 예시용으로 간단하게 string과 integer, Enum으로 클래스를 구성했으나 클래스 내부에 매핑할 다른 클래스를 정의하여 List로 담는 방법도 구현할 수 있다. Enum 타입의 경우 입력은 String 값으로 받게 되지만 이후 graphene 모듈이 값을 매핑해주기 때문에 할당된 값이 들어온다고 생각하면 된다. 여기서는 일치하는 string 입력값에 int 값으로 매핑할 것이다.

from graphene import *

# 과목을 의미하는 enum타입 정의
class SubjectType(Enum):
    MATH = 1
    ENGLISH = 2
    SCIENCE = 3
    PHYSICAL_EDUCATION = 4
# 출력값을 매핑할 class
class Score(ObjectType):
    grade = Int(required=True)
    class_code = Int(required=True)
    subject_type = SubjectType(required=True)
    name = String(required=True)
    score = Int()
# mutation 호출 시 입력값 매핑할 class
# RDS DB Table의 pk는 grade, class_code, subject_type, name
# 따라서 4개의 column에 대해 required flag ON
class PostScoreInput(InputObjectType):
    grade = Int(required=True)
    class_code = Int(required=True)
    subject_type = SubjectType(required=True)
    name = String(required=True)
    score = Int()
# query 호출 시 입력값 매핑할 class
class GetScoreInput(InputObjectType):
    grade = Int()
    class_code = Int()
    subject_type = SubjectType()
    name = String()
    score = Int()

resolver/test.py 코드

schema/test.py에서 구조를 정의했으므로 graphene API 호출 시 입력값과 출력값을 어떻게 받고 매핑해서 반환할 지 명세해야 한다. 매핑해야 하므로 정의해둔 schema를 import하고 반환 값을 구성해야 하므로 내부 db module도 import한다. 이후 데이터 조회를 수행할 함수들을 모아서 Query 클래스를 생성한다. input 값이 필수적으로 요구된다면 required flag를 세우자. class 내부에 prefix로 resolve_[변수명]으로 정의하면 graphql 쿼리 시 graphene module이 호출받은 API에서 해당 변수명을 찾아 resolve 함수를 호출한다.

from schema.test import *
from .util import db
# query할 메소드를 포함하는 Query class
class Query(ObjectType):
    get_score = Field(Score, input=GetScoreInput(required=True))
    list_score = List(Score, input=GetScoreInput(required=True))

    def resolve_get_score(root, info, input):
        return get_score(**input)

    def resolve_list_score(root, info, input):
        return list_score(**input)

# mutation할 메소드를 포함하는 Mutation class
class Mutation(ObjectType):
    create_score = Field(Score, input=PostScoreInput(required=True))
    update_score = Field(Score, input=PostScoreInput(required=True))
    delete_score = Field(Score, input=PostScoreInput(required=True))

    def resolve_create_score(root, info, input):
        return create_score(**input)

    def resolve_update_score(root, info, input):
        return update_score(**input)

    def resolve_delete_score(root, info, input):
        return delete_score(**input)

# 인자값 정리를 위해 내부 db moudule의 sanitize 데커레이터 호출
# 학년, 반, 과목, 이름, 점수 입력
@db.sanitize
def create_score(grade, class_code, subject_type, name, score=None):
    print(f'subject : {subject_type}')
    return db.execute(f"""
        INSERT INTO practice.grade_test (grade, class_code, subject_type, name_, score) VALUES
        ({grade}, {class_code}, {subject_type}, {name}, {score}) RETURNING *""")[0]

# 학년, 반, 과목, 이름에 대한 점수 조회
@db.sanitize
def get_score(grade, class_code, subject_type, name):
    return db.execute(f"""SELECT * FROM practice.grade_test
        WHERE grade={grade} AND class_code={class_code} AND subject_type={subject_type} AND name_={name}""")[0]

# 학년, 반, 이름에 대한 과목별 점수 조회
@db.sanitize
def list_score(grade, class_code, name=None):
    return db.execute(f"SELECT * FROM practice.grade_test WHERE grade={grade} AND class_code={class_code} AND name_={name}")

# 학년, 반, 과목, 이름에 대한 점수 수정
@db.sanitize
def update_score(grade, class_code, subject_type, name, score=None):
    return db.execute(f"""
        UPDATE practice.grade_test SET
        score = COALESCE({score}, -1)
    WHERE grade={grade} AND class_code={class_code} AND subject_type={subject_type} AND name_={name}
    RETURNING *;
""")[0]

# 학년, 반, 과목, 이름에 대한 점수 삭제
@db.sanitize
def delete_score(grade, class_code, subject_type, name):
    return db.execute(f"""
    DELETE FROM practice.grade_test
    WHERE grade={grade} AND class_code={class_code} AND subject_type={subject_type} AND name_={name}
    RETURNING *;
""")[0]

lambda_function.py 코드

import json
import graphene
import resolver as r

def post_query_build(grade, classCode, subjectType, name, score):
    query = """
        mutation CreateScore ($input: PostScoreInput!) {
            createScore (input: $input) {
                grade
                classCode
                subjectType
                name
                score
            }
        }
        """
    variables = {'input': {'grade': grade, 'classCode': classCode, 'subjectType': subjectType, 'name': name, 'score': score}}
    return query, variables

def update_query_build(grade, classCode, subjectType, name, score):
    query = """
        mutation UpdateScore ($input: PostScoreInput!) {
            updateScore (input: $input) {
                grade
                classCode
                subjectType
                name
                score
            }
        }
        """
    variables = {'input': {'grade': grade, 'classCode': classCode, 'subjectCode': subjectType, 'name': name, 'score': score}}
    return query, variables

def delete_query_build(grade, classCode, subjectType, name):
    query = """
        mutation DeleteScore ($input: PostScoreInput!) {
            deleteScore (input: $input) {
                grade
                classCode
                subjectType
                name
                score
            }
        }
        """
    variables = {'input': {'grade': grade, 'classCode': classCode, 'subjectType': subjectType, 'name': name}}
    return query, variables

def get_query_build(grade, classCode, subjectType, name):
    query = """
        query GetScore ($input: GetScoreInput!) {
            getScore (input: $input) {
                grade
                classCode
                subjectType
                name
                score
            }
        }
        """
    variables = {'input': {'grade': grade, 'classCode': classCode, 'subjectType': subjectType, 'name': name}}
    return query, variables

def list_query_build(grade, classCode, name):
    query = """
        query ListScore ($input: GetScoreInput!) {
            listScore (input: $input) {
                subjectType
                name
                score
            }
        }
        """
    variables = {'input': {'grade': grade, 'classCode': classCode, 'name': name}}
    return query, variables

server = graphene.Schema(query=r.test.Query, mutation=r.test.Mutation)

def lambda_handler(event, context):

    try:
        """
        input = {'grade':1, 'classCode':1, 'subjectType':'SCIENCE', 'name':'khg', 'score':12345}
        query, params = post_query_build(**input)
        """
        """
        input = {'grade':1, 'classCode':1, 'subjectType':'MATH', 'name':khg'}
        query, params = get_query_build(**input)
        """
        """
        input = {'grade':1, 'classCode':1, 'name':khg
        query, params = list_query_build(**input)
        """
        """
        input = {'grade':1, 'classCode':1, 'subjectType':'ENGLISH', 'name':'khg', 'score':54321}
        query, params = update_query_build(**input)
        """
        """
        input = {'grade':1, 'classCode':1, 'subjectType':'MATH', 'name':'khg'}
        query, params = delete_query_build(**input)
        """
        query = event['query']
        params = {'input':event['input']}
        result = server.execute(query, variables=params)
        return result.data
    except Exception as e:
        print(str(e))

혹시 몰라 각 기능별로 예제 쿼리와 그에 맞는 input 값을 주석으로 처리했다. 실제 사용에서는 client로부터 입력받아야 하므로 event json에서 받아와야 한다. 미리 어떤 key로 query와 variables를 전송할 지 결정해두자. 이 포스팅에서는 query 키는 'query', varialbes 키는 'input'으로 결정했다.

API Gateway stage 생성

나름의 엔진 역할을 해줄 Lambda 코드 작성을 끝냈으니 Client에서 요청할 수 있도록 Endpoint를 구성해야 한다. Get 메소드던 Post 메소드던 상관은 없으나 그래도 쿼리문과 파라미터를 받기 때문에 여기서는 Post 메소드로 stage를 publish 했다. 이후 Android App에서 AWS API Gateway가 제공한 Post 메소드 Endpoint URL로 request를 보내면 된다.

다음 포스팅에서는 Android App에서 GraphQL API 호출을 해보기로 한다.

+ Recent posts