transforms-aip
라이브러리는 언어 모델 API를 PySpark 워크플로에 통합하는 것을 단순화합니다. 이 라이브러리를 사용하면 사용자는 Spark DataFrame 내의 데이터를 사용하여 완성 및 임베딩 모델에 대한 요청을 생성할 수 있습니다. transforms-aip
라이브러리의 기능은 다음과 같습니다:
이러한 기능은 사용자가 최소한의 설정만으로 사용할 수 있습니다.
transforms-aip
라이브러리 설치하기transforms-aip
라이브러리를 사용하기 시작하려면, 다음의 종속성을 transforms 저장소에 이 순서대로 설치해야 합니다:
palantir_models>=0.933.0
transforms-aip>=0.386.0
이 섹션에는 완성 예시와 임베딩 예시가 포함되어 있습니다.
완성을 생성하려는 주어진 데이터셋의 경우, 텍스트 열을 라이브러리에 공급하기만 하면 됩니다. 아래의 예에서는 question
이라는 텍스트 열을 사용하였습니다:
id | question |
---|---|
1 | What is the capital of Canada? |
2 | Which country has the largest population? |
3 | Name the longest river in South America. |
4 | How many states are there in the United States? |
5 | What is the name of the largest ocean on Earth? |
아래는 완성을 실행하기 위한 전체 코드 스니펫입니다; 열이 추가될 때의 단계에 대한 주석에 주목하세요:
Copied!1from pyspark.sql import functions as F 2from transforms.api import transform, Input, Output, configure 3from transforms.aip.orchestrators import ( 4 CompletionOrchestrator, 5 CompletionModelProperties, 6) 7from palantir_models.transforms import OpenAiGptChatLanguageModelInput 8 9RATE_LIMIT_PER_MIN = 100 10TOKEN_LIMIT_PER_MIN = 50000 11 12# 설정 13@configure(["NUM_EXECUTORS_2"]) 14@transform( 15 output=Output("output_dataset"), 16 questions=Input("input_dataset"), 17 # 사용할 모델을 지정하고 레포지토리에서 가져오기 18 chat_model=OpenAiGptChatLanguageModelInput( 19 "ri.language-model-service..language-model.gpt-35_azure" 20 ), 21) 22def compute(output, questions, chat_model, ctx): 23 base_prompt = "Answer this question: " 24 25 # 500개 질문 샘플링 26 sample_questions = questions.dataframe().limit(500) 27 28 # 프롬프트 구성 29 # 오케스트레이터에 전달할 question_prompt 열 추가 30 questions_with_prompt = sample_questions.withColumn( 31 "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) 32 ) 33 34 # 오케스트레이터 생성 35 completions = CompletionOrchestrator( 36 RATE_LIMIT_PER_MIN, 37 TOKEN_LIMIT_PER_MIN, 38 chat_model, 39 # OpenAI 호환 속성을 전달할 수 있음 40 model_properties=CompletionModelProperties(temperature=0.6), 41 ) 42 43 # llm_answer 열에 응답 생성 44 # _completion_error 열에 응답과 관련된 문제가 발생한 경우 생성 45 answered = completions.withColumn( 46 ctx, questions_with_prompt, "question_prompt", "llm_answer" 47 ) # .cache() if doing further operations 48 49 # 결과 저장 50 output.write_dataframe(answered)
이렇게 하면 제공한 이름(llm_answer
)을 가진 답변 열과 요청이 실패한 경우를 나타내는 오류 열(_completion_error
)이 생성됩니다.
예를 들어 이 변환의 결과물은 다음과 같습니다:
id | question | question_prompt | llm_answer | _completion_error |
---|---|---|---|---|
1 | What is the capital of Canada? | 이 질문에 답하세요: 캐나다의 수도는 어디입니까? | 오타와 | null |
2 | Which country has the largest population? | 이 질문에 답하세요: 인구가 가장 많은 나라는 어디입니까? | 중국 | null |
3 | Name the longest river in South America. | 이 질문에 답하세요: 남미에서 가장 긴 강의 이름은 무엇입니까? | 아마존 강 | null |
4 | How many states are there in the United States? | 이 질문에 답하세요: 미국에는 총 몇 개의 주가 있습니까? | 50 | null |
5 | What is the name of the largest ocean on Earth? | 이 질문에 답하세요: 지구에서 가장 큰 바다의 이름은 무엇입니까? | 태평양 | null |
임베딩의 경우, 오케스트레이터를 사용하는 코드 구조는 Completion 오케스트레이터와 유사합니다. 아래와 같습니다:
Copied!1from transforms.api import transform, Input, Output, configure 2from transforms.aip.orchestrators import EmbeddingOrchestrator 3from palantir_models.transforms import GenericEmbeddingModelInput 4 5# 매 분당 요청 수 제한 6RATE_LIMIT_PER_MIN = 100 7# 매 분당 토큰 수 제한 8TOKEN_LIMIT_PER_MIN = 50000 9 10 11@configure(["NUM_EXECUTORS_2"]) 12@transform( 13 output=Output("output_dataset"), 14 questions=Input("input_dataset"), 15 # 임베딩 모델 사용을 위해 임포트해야 함 16 embedding_model=GenericEmbeddingModelInput( 17 "ri.language-model-service..language-model.text-embedding-ada-002_azure" 18 ), 19) 20def compute(output, questions, embedding_model, ctx): 21 # 1. 500개의 질문을 가져옵니다 22 sample_questions = questions.dataframe().limit(500) 23 24 # 2. 오케스트레이터를 인스턴스화합니다 25 embeddings = EmbeddingOrchestrator( 26 RATE_LIMIT_PER_MIN, 27 TOKEN_LIMIT_PER_MIN, 28 embedding_model, 29 ) 30 31 # 3. 임베딩을 실행합니다 32 # 임베딩 결과를 embedding_result 컬럼에 생성합니다 33 # 문제가 발생하면 _embeddings_error 컬럼에 기록합니다 34 questions_with_embeddings = embeddings.withColumn( 35 ctx, sample_questions, "question", "embedding_result" 36 ) 37 38 # 4. 결과를 저장합니다 39 output.write_dataframe(questions_with_embeddings)
완료 오케스트레이터와 마찬가지로, 임베딩 오케스트레이터는 임베딩 응답 열(embedding_result
)과 오류 열(_embeddings_error
)을 생성합니다:
id | question | embedding_result | _embeddings_error |
---|---|---|---|
1 | 캐나다의 수도는 무엇입니까? | [0.12, 0.54, ...] | null |
2 | 인구가 가장 많은 나라는 어디입니까? | [-0.23, 0.87, ...] | null |
3 | 남아메리카에서 가장 긴 강의 이름을 말하십시오. | [0.65, -0.48, ...] | null |
4 | 미국에는 몇 개의 주가 있습니까? | [0.33, 0.92, ...] | null |
5 | 지구상에서 가장 큰 바다의 이름은 무엇입니까? | [-0.11, 0.34, ...] | null |
실행에 대한 일반적인 정보를 얻으려면, 다음 예제와 같이 오케스트레이터의 생성자에 verbose=True
인수를 전달하십시오:
Copied!1# CompletionOrchestrator 인스턴스를 생성합니다. 2# 이 인스턴스는 주어진 속도 제한(RATE_LIMIT_PER_MIN), 토큰 제한(TOKEN_LIMIT_PER_MIN), 채팅 모델(chat_model)을 사용하여 작동합니다. 3# verbose=True는 자세한 로깅을 활성화합니다. 4completions = CompletionOrchestrator( 5 RATE_LIMIT_PER_MIN, 6 TOKEN_LIMIT_PER_MIN, 7 chat_model, 8 verbose=True, 9 )
이는 실행을 이해하는 데 도움이 되는 메타데이터 열을 결과에 추가합니다. 이러한 열에는 요청이 실행된 파티션, 요청의 타임스탬프 및 사용된 토큰이 포함됩니다.
파이프라인 후반부에서 DataFrame에 추가 작업을 수행하는 경우, 오케스트레이터 호출 끝에 .cache()
를 추가하는 것이 때때로 필요합니다.
예를 들면:
Copied!1# 'completions' 데이터프레임에 컬럼을 추가하고, 결과를 'result' 변수에 저장합니다. 2# ctx: 컨텍스트 변수 3# questions_with_prompt: 질문과 프롬프트가 포함된 데이터프레임 4# "question_prompt": 새로운 컬럼의 이름 5# "llm_answer": 추가할 데이터의 컬럼 이름 6result = completions.withColumn(ctx, questions_with_prompt, "question_prompt", "llm_answer").cache()
이 액션은 Spark Optimizer가 최적화의 일부로 호출을 실수로 제거하지 않도록 보장합니다. 응답이나 오류 없이 빈 행을 만나게 되면, 이는 종종 .cache()
를 사용해야 하는 상황을 나타냅니다.