Esempio di modello di incorporamento di base di Ricerca di intelligenza artificiale (GTE)

Questo notebook illustra come usare lo SDK Python di AI Search, che fornisce AISearchClient come API principale per lavorare con AI Search.

Questo notebook usa le API modello di Databricks Foundation per accedere al modello di incorporamento GTE per generare incorporamenti.

%pip install --upgrade --force-reinstall databricks-ai-search
dbutils.library.restartPython()
from databricks.ai_search.client import AISearchClient

vsc = AISearchClient(disable_notice=True)
help(AISearchClient)

Caricare un set di dati toy nella tabella Delta di origine

Di seguito viene creata la tabella Delta di origine.

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.

catalog_name = "main"
schema_name = "default"
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.

# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)

Set di dati di esempio a blocchi

La suddivisione in blocchi del set di dati di esempio consente di evitare di superare il limite di contesto del modello di incorporamento. Il modello GTE supporta fino a 8192 token. Tuttavia, Databricks consiglia di suddividere i dati in blocchi di contesto più piccoli in modo da poter inserire un'ampia gamma di esempi nel modello di ragionamento per l'applicazione RAG.

import tiktoken
import pandas as pd

# The GTE model has been trained on a max context lenth of 8192 tokens.
max_chunk_tokens = 8192
encoding = tiktoken.get_encoding("cl100k_base")

def chunk_text(text):
    # Encode and then decode within the UDF
    tokens = encoding.encode(text)
    chunks = []
    while tokens:
        chunk_tokens = tokens[:max_chunk_tokens]
        chunk_text = encoding.decode(chunk_tokens)
        chunks.append(chunk_text)
        tokens = tokens[max_chunk_tokens:]
    return chunks

# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
    text_chunks = chunk_text(row['text'])
    chunk_no = 0
    for chunk in text_chunks:
        row_data = row.to_dict()

        # replace the id column with a new unique chunk id
        # and the text column with the text chunk
        row_data['id'] = f"{row['id']}_{chunk_no}"
        row_data['text'] = chunk

        processed_data.append(row_data)
        chunk_no += 1

chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)

# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

Creare un endpoint

ai_search_endpoint_name = "ai-search-demo-endpoint"
vsc.create_endpoint(
    name=ai_search_endpoint_name,
    endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
  name=ai_search_endpoint_name
)

Creare un indice

# AI Search index
vs_index = f"{source_table_name}_gte_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"

embedding_model_endpoint = "databricks-qwen3-embedding-0-6b"
index = vsc.create_delta_sync_index(
  endpoint_name=ai_search_endpoint_name,
  source_table_name=source_table_fullname,
  index_name=vs_index_fullname,
  pipeline_type='TRIGGERED',
  primary_key="id",
  embedding_source_column="text",
  embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the index.
import time
index = vsc.get_index(endpoint_name=ai_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
  print("Waiting for index to be ready...")
  time.sleep(30)
print("Index is ready!")
index.describe()

Nelle celle seguenti viene illustrato come eseguire una query sull'indice per trovare documenti simili.

results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5
  )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.

# Standard endpoint syntax
results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5,
  filters={"title NOT": "Hercules"}
)

# Storage-optimized endpoint syntax
# results = index.similarity_search(
#   query_text="Greek myths",
#   columns=["id", "text", "title"],
#   num_results=5,
#   filters='title != "Hercules"'
#   )


rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")

Eliminare l'indice

vsc.delete_index(
  endpoint_name=ai_search_endpoint_name,
  index_name=vs_index_fullname
)

Notebook di esempio

Esempio di modello di incorporamento di base di Ricerca di intelligenza artificiale (GTE)

Ottieni il notebook