import os
import warnings
import multiprocessing
from functools import partial
from stark_qa.tools.api_lib.claude import complete_text_claude
from stark_qa.tools.api_lib.gpt import get_gpt_output
from stark_qa.tools.api_lib.huggingface import complete_text_hf
from stark_qa.tools.api_lib.openai_emb import get_openai_embedding, get_openai_embeddings
# Default parameters for retrying API calls and the sleep time between retries
MAX_OPENAI_RETRY = int(os.getenv("MAX_OPENAI_RETRY", 5))
OPENAI_SLEEP_TIME = int(os.getenv("OPENAI_SLEEP_TIME", 60))
MAX_CLAUDE_RETRY = int(os.getenv("MAX_CLAUDE_RETRY", 10))
CLAUDE_SLEEP_TIME = int(os.getenv("CLAUDE_SLEEP_TIME", 0))
LLM_PARALLEL_NODES = int(os.getenv("LLM_PARALLEL_NODES", 5))
# Register the available text completion LLMs
registered_text_completion_llms = {
"gpt-4-1106-preview",
"gpt-4-0125-preview",
"gpt-4-turbo-preview",
"gpt-4-turbo",
"gpt-4-turbo-2024-04-09",
"claude-2.1",
"claude-3-opus-20240229",
"claude-3-sonnet-20240229",
"claude-3-haiku-20240307",
"huggingface/codellama/CodeLlama-7b-hf",
"text-embedding-3-small",
"text-embedding-3-large",
"text-embedding-ada-002"
}
[docs]def parallel_func(func, n_max_nodes=LLM_PARALLEL_NODES):
"""
A general function to call a function on a list of inputs in parallel.
Args:
func (callable): The function to apply.
n_max_nodes (int): Maximum number of parallel processes.
Returns:
callable: A wrapper function that applies `func` in parallel.
"""
def _parallel_func(inputs: list, **kwargs):
partial_func = partial(func, **kwargs)
processes = min(len(inputs), n_max_nodes)
with multiprocessing.Pool(processes=processes) as pool:
results = pool.map(partial_func, inputs)
return results
return _parallel_func
[docs]def get_llm_output(message,
model="gpt-4-0125-preview",
max_tokens=2048,
temperature=1,
json_object=False):
"""
A general function to complete a prompt using the specified model.
Args:
message (str or list): The input message or a list of message dicts.
model (str): The model to use for completion.
max_tokens (int): Maximum number of tokens to generate.
temperature (float): Sampling temperature.
json_object (bool): Whether to output in JSON format.
Returns:
str: The completed text generated by the model.
Raises:
ValueError: If the model is not recognized.
"""
if model not in registered_text_completion_llms:
warnings.warn(f"Model {model} is not registered. You may still be able to use it.")
kwargs = {
'message': message,
'model': model,
'max_tokens': max_tokens,
'temperature': temperature,
'json_object': json_object
}
if 'gpt-4' in model:
kwargs.update({'max_retry': MAX_OPENAI_RETRY, 'sleep_time': OPENAI_SLEEP_TIME})
return get_gpt_output(**kwargs)
elif 'claude' in model:
kwargs.update({'max_retry': MAX_CLAUDE_RETRY, 'sleep_time': CLAUDE_SLEEP_TIME})
return complete_text_claude(**kwargs)
elif 'huggingface' in model:
return complete_text_hf(**kwargs)
else:
raise ValueError(f"Model {model} not recognized.")
# Parallel functions for text completion
complete_texts_claude = parallel_func(complete_text_claude)
complete_texts_hf = parallel_func(complete_text_hf)
get_gpt_outputs = parallel_func(get_gpt_output)
get_llm_outputs = parallel_func(get_llm_output)