Serve NVIDIA NIM Models with Union Actors
This tutorial shows you how to serve NVIDIA NIM-supported models using Union actors.
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>
The source code for this example can be found here.
By using Union actors, we ensure the model is pulled from the model registry and initialized only once. This setup guarantees the model remains available for serving as long as the actor is running, enabling “near-real-time” inference. Let’s dive in by importing the necessary libraries and modules:
import functools
import os
import union
from flytekit.extras.accelerators import A10G
from flytekitplugins.inference import NIM, NIMSecrets
from union.actor import ActorEnvironment
Create secrets
This workflow requires both a Hugging Face API key and an NGC API key. Below are the steps to set up these secrets:
Hugging Face secret
- Generate an API key: Obtain your API key from the Hugging Face website.
- Create a Secret: Use the Union CLI to create the secret:
$ union create secret hf-api-key
NGC secret
- Generate an API key: Obtain your API key from the NGC website.
- Create a secret: Use the Union CLI to create the secret:
$ union create secret ngc-api-key
Image pull secret
Union’s remote image builder enables pulling images from private registries. To create an image pull secret, follow these steps:
- Log in to NVCR locally by running:
docker login nvcr.io
- Create an image pull secret using your local
~/.docker/config.json
:IMAGEPULLSECRET=$(union create imagepullsecret --registries nvcr.io -q)
- Add it as a Union secret:
union create secret --type image-pull-secret --value-file $IMAGEPULLSECRET nvcr-pull-creds
HF_KEY = "hf-api-key"
HF_REPO_ID = "Samhita/OrpoLlama-3-8B-Instruct"
NGC_KEY = "ngc-api-key"
NVCR_SECRET = "nvcr-pull-creds"
Define ImageSpec
We include all the necessary libraries in the imagespec to ensure they are available when running the workflow.
image = union.ImageSpec(
name="nim_serve",
builder="union",
packages=[
"langchain-nvidia-ai-endpoints==0.3.10",
"langchain==0.3.25",
"langchain-community==0.3.25",
"arxiv==2.2.0",
"pymupdf==1.26.1",
"union==0.1.183",
"flytekitplugins-inference",
],
builder_options={"imagepull_secret_name": NVCR_SECRET},
)
builder_options
is used to pass the image pull secret to the builder.
Load documents from Arxiv
In this step, we load the Arxiv data using LangChain.
You can adjust the top_k_results
parameter to a higher value to retrieve more documents from the Arxiv repository.
@union.task(cache=True, container_image=image)
def load_arxiv() -> list[list[str]]:
from langchain_community.document_loaders import ArxivLoader
loader = ArxivLoader(
query="reasoning", top_k_results=10, doc_content_chars_max=8000
)
documents = []
temp_documents = []
for document in loader.lazy_load():
temp_documents.append(document.page_content)
if len(temp_documents) >= 10:
documents.append(temp_documents)
temp_documents = []
return documents
We return documents in batches of 10 and send them to the model to generate summaries in parallel.
Set up NIM and actor
We instantiate the NIM plugin and set up the actor environment. We load a fine-tuned LLama3 8B model to serve.
nim_instance = NIM(
image="nvcr.io/nim/meta/llama3-8b-instruct:1.0.0",
secrets=NIMSecrets(
ngc_secret_key=NGC_KEY,
secrets_prefix="_UNION_",
hf_token_key=HF_KEY,
),
hf_repo_ids=[HF_REPO_ID],
lora_adapter_mem="500Mi",
env={"NIM_PEFT_SOURCE": "/home/nvs/loras"},
)
By default, the NIM instantiation sets cpu
, gpu
, and mem
to 1, 1, and 20Gi, respectively. You can modify these settings as needed.
To serve the NIM model efficiently, we configure the actor to launch a single replica, ensuring the model is loaded once and reused across predictions. We set a TTL (Time-To-Live) of 900 seconds, allowing the actor to remain active for 15 minutes while idle. This helps reduce cold starts and enables faster response to follow-up requests.
The model runs on an A10G GPU, and the number of GPUs is set to 0 so that the GPU is allocated to the model server itself rather than the task that invokes it.
actor_env = ActorEnvironment(
name="nim-actor",
replica_count=1,
pod_template=nim_instance.pod_template,
container_image=image,
ttl_seconds=900,
secret_requests=[
union.Secret(key=HF_KEY),
union.Secret(key=NVCR_SECRET),
union.Secret(key=NGC_KEY),
],
accelerator=A10G,
requests=union.Resources(gpu="0"),
)
Generate summaries
We define an actor task to generate summaries of Arxiv PDFs.
The task processes a batch of PDFs simultaneously to generate summaries. When invoked multiple times, it reuses the existing actor environment for subsequent batches.
@actor_env.task
def generate_summary(arxiv_pdfs: list[str], repo_id: str) -> list[str]:
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_nvidia_ai_endpoints import ChatNVIDIA
os.environ["NVIDIA_API_KEY"] = union.current_context().secrets.get(key=NGC_KEY)
llm = ChatNVIDIA(
base_url=f"{nim_instance.base_url}/v1", model=repo_id.split("/")[1]
)
prompt_template = "Summarize this content: {content}"
prompt = PromptTemplate(input_variables=["content"], template=prompt_template)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
return chain.batch([{"content": arxiv_pdf} for arxiv_pdf in arxiv_pdfs])
Next, we set up a workflow that first loads the data and then summarizes it. We use a map task to generate summaries in parallel. Since the replica count is set to 1, only one map task runs at a time. However, if you increase the replica count, more tasks will run concurrently, spinning up additional models. After the first run, subsequent summarization tasks reuse the actor environment, speeding up the process. The workflow returns a list of summaries.
@union.workflow
def batch_inference_wf(repo_id: str = HF_REPO_ID) -> list[list[str]]:
arxiv_pdfs = load_arxiv()
return union.map_task(functools.partial(generate_summary, repo_id=repo_id))(
arxiv_pdfs=arxiv_pdfs
)