Compare commits

...

4 Commits

Author SHA1 Message Date
ebfea97af7 [add] youtube plugin, [improve] initial prompt (JSON)
All checks were successful
continuous-integration/drone/push Build is passing
2023-11-10 09:19:24 -05:00
ca8c306534 [add] better error handling
All checks were successful
continuous-integration/drone/push Build is passing
2023-11-08 20:52:29 -05:00
3168bfffd1 Merge pull request 'Add Plugins' (#1) from function_plugins into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #1
2023-11-09 00:31:51 +00:00
7f0d74458d [add] migrate chromadb to plugin
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2023-11-08 18:35:56 -05:00
15 changed files with 309 additions and 194 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@ __pycache__
.DS_Store
.direnv
data
datasets
venv
openai_key
ha_key

101
README.md
View File

@@ -13,12 +13,58 @@
---
AI Chat Bot with Plugins (Home Assistant, Vehicle Lookup, DuckDuckGo Search)
AI Chat Bot with Plugins (RAG VectorDB - ChromaDB, DuckDuckGo Search, Home Assistant, Vehicle Lookup, YouTube)
[![Build Status](https://drone.va.reichard.io/api/badges/evan/minyma/status.svg)](https://drone.va.reichard.io/evan/minyma)
## Plugins
### ChromeDB Embeddings / Vectors
This utilizes a local embeddings DB. This allows you to ask the assistant
about local information. [Utilizes Retrieval-Augmented Generation (RAG)](https://arxiv.org/abs/2005.11401).
```
User: What are some common symptoms of COVID-19?
Assistant: Some common symptoms of COVID-19 mentioned in the context are
fatigue, headache, dyspnea (shortness of breath), anosmia (loss of
sense of smell), lower respiratory symptoms, cardiac symptoms,
concentration or memory issues, tinnitus and earache, and peripheral
neuropathy symptoms.
```
**NOTE:** Instructions on how to load this with your own information are in the
"Normalizing & Loading Data" section. We include a PubMed data normalizer as an
example.
### YouTube
This utilizes `yt-dlp` to download a videos subtitles. Ask questions about YouTube videos!
```
User: Tell me about this youtube video: https://www.youtube.com/watch?v=ZWgr7qP6yhY
Assistant: The YouTube video you provided is a review of the new MacBook Pro by
Apple. The host discusses the laptop's features, including its new
color and chip. They mention that the laptop still retains its ports,
HDMI, and high-quality display, but also notes some shortcomings like
the notch and lack of face unlock. The host shares their impressions
of the new black color [...]
```
### DuckDuckGo
This utilizes DuckDuckGo Search by scraping the top 5 results.
```
User: Tell me about Evan Reichard
Assistant: Evan Reichard is a Principal Detection and Response Engineer based
in the Washington DC-Baltimore Area. He has been in this role since
August 2022. Evan has created a browser extension that helps SOC
analysts and saves them over 300 hours per month. Additionally,
there are three professionals named Evan Reichard on LinkedIn and
there are also profiles of people named Evan Reichard on Facebook.
```
### Vehicle Lookup API
This utilizes Carvana's undocumented API to lookup details on a vehicle.
@@ -41,25 +87,12 @@ User: Turn on the living room lights
Assistant: The living room lights have been turned on successfully.
```
### DuckDuckGo
This utilizes DuckDuckGo Search by scraping the top 5 results.
```
User: Tell me about Evan Reichard
Assistant: Evan Reichard is a Principal Detection and Response Engineer based
in the Washington DC-Baltimore Area. He has been in this role since
August 2022. Evan has created a browser extension that helps SOC
analysts and saves them over 300 hours per month. Additionally,
there are three professionals named Evan Reichard on LinkedIn and
there are also profiles of people named Evan Reichard on Facebook.
```
## Running Server
```bash
# Locally (See "Development" Section)
export OPENAI_API_KEY=`cat openai_key`
export CHROMA_DATA_PATH=/data
export HOME_ASSISTANT_API_KEY=`cat ha_key`
export HOME_ASSISTANT_URL=https://some-url.com
@@ -69,7 +102,7 @@ minyma server run
docker run \
-p 5000:5000 \
-e OPENAI_API_KEY=`cat openai_key` \
-e DATA_PATH=/data \
-e CHROMA_DATA_PATH=/data \
-v ./data:/data \
gitea.va.reichard.io/evan/minyma:latest
```
@@ -87,10 +120,10 @@ To normalize data, you can use Minyma's `normalize` CLI command:
```bash
minyma normalize \
--filename ./pubmed_manuscripts.jsonl \
--normalizer pubmed \
--database chroma \
--datapath ./chroma
--datapath ./data \
--filename ./datasets/pubmed_manuscripts.jsonl
```
The above example does the following:
@@ -106,9 +139,11 @@ The above example does the following:
## Configuration
| Environment Variable | Default Value | Description |
| -------------------- | ------------- | ---------------------------------------------------------------------------------- |
| OPENAI_API_KEY | NONE | Required OpenAI API Key for ChatGPT access. |
| DATA_PATH | ./data | The path to the data directory. Chroma will store its data in the `chroma` subdir. |
| ---------------------- | ------------- | ----------------------------------- |
| OPENAI_API_KEY | NONE | Required OpenAI API Key for ChatGPT |
| CHROMA_DATA_PATH | NONE | ChromaDB Persistent Data Director |
| HOME_ASSISTANT_API_KEY | NONE | Home Assistant API Key |
| HOME_ASSISTANT_URL | NONE | Home Assistant Instance URL |
# Development
@@ -120,31 +155,9 @@ python3 -m venv venv
# Local Development
pip install -e .
# Creds
# Creds & Other Environment Variables
export OPENAI_API_KEY=`cat openai_key`
# Docker
make docker_build_local
```
# Notes
This is the first time I'm doing anything LLM related, so it was an adventure.
Initially I was entertaining OpenAI's Embedding API with plans to load embeddings
into Pinecone, however initial calculations with `tiktoken` showed that generating
embeddings would cost roughly $250 USD.
Fortunately I found [Chroma](https://www.trychroma.com/), which basically solved
both of those issues. It allowed me to load in the normalized data and automatically
generated embeddings for me.
In order to fit into OpenAI ChatGPT's token limit, I limited each document to roughly
1000 words. I wanted to make sure I could add the top two matches as context while
still having enough headroom for the actual question from the user.
A few notes:
- Context is not carried over from previous messages
- I "stole" the prompt that is used in LangChain (See `oai.py`). I tried some variations without much (subjective) improvement.
- A generalized normalizer format. This should make it fairly easy to use completely different data. Just add a new normalizer that implements the super class.
- Basic web front end with TailwindCSS

View File

@@ -16,15 +16,14 @@ def signal_handler(sig, frame):
def create_app():
global oai, vdb, plugins
global oai, plugins
from minyma.config import Config
import minyma.api.common as api_common
import minyma.api.v1 as api_v1
app = Flask(__name__)
vdb = ChromaDB(path.join(Config.DATA_PATH, "chroma"))
oai = OpenAIConnector(Config.OPENAI_API_KEY, vdb)
oai = OpenAIConnector(Config.OPENAI_API_KEY)
plugins = PluginLoader(Config)
app.register_blueprint(api_common.bp)
@@ -70,7 +69,7 @@ def normalize(filename, normalizer, database, datapath):
return print("INVALID NORMALIZER:", normalizer)
# Process Data
vdb.load_documents(norm)
vdb.load_documents(norm.name, norm)
signal.signal(signal.SIGINT, signal_handler)

View File

@@ -19,36 +19,17 @@ def get_response():
resp = minyma.oai.query(message)
# Derive LLM Data
# llm_resp = resp.get("llm", {})
# llm_choices = llm_resp.get("choices", [])
# Derive VDB Data
# vdb_resp = resp.get("vdb", {})
# combined_context = [{
# "id": vdb_resp.get("ids")[i],
# "distance": vdb_resp.get("distances")[i],
# "doc": vdb_resp.get("docs")[i],
# "metadata": vdb_resp.get("metadatas")[i],
# } for i, _ in enumerate(vdb_resp.get("docs", []))]
# Return Data
return resp
"""
Return the raw vector db related response
TODO - Embeds and loads data into the local ChromaDB.
{
"input": "string",
"normalizer": "string",
}
"""
@bp.route("/related", methods=["POST"])
def get_related():
data = request.get_json()
if not data:
return {"error": "Missing Message"}
message = str(data.get("message"))
if message == "":
return {"error": "Empty Message"}
related_documents = minyma.vdb.get_related(message)
return related_documents
bp.route("/embed", methods=["POST"])
def post_embeddings():
pass

View File

@@ -1,11 +1,12 @@
import os
def get_env(key, default=None, required=False) -> str:
def get_env(key, default=None, required=False) -> str | None:
"""Wrapper for gathering env vars."""
if required:
assert key in os.environ, "Missing Environment Variable: %s" % key
return str(os.environ.get(key, default))
env = os.environ.get(key, default)
return str(env) if env is not None else None
class Config:
@@ -19,7 +20,7 @@ class Config:
OpenAI API Key - Required
"""
DATA_PATH: str = get_env("DATA_PATH", default="./data")
OPENAI_API_KEY: str = get_env("OPENAI_API_KEY", required=True)
HOME_ASSISTANT_API_KEY: str = get_env("HOME_ASSISTANT_API_KEY", required=False)
HOME_ASSISTANT_URL: str = get_env("HOME_ASSISTANT_URL", required=False)
CHROMA_DATA_PATH: str | None = get_env("CHROMA_DATA_PATH", required=False)
HOME_ASSISTANT_API_KEY: str | None = get_env("HOME_ASSISTANT_API_KEY", required=False)
HOME_ASSISTANT_URL: str | None = get_env("HOME_ASSISTANT_URL", required=False)
OPENAI_API_KEY: str | None = get_env("OPENAI_API_KEY", required=True)

View File

@@ -18,6 +18,7 @@ class PubMedNormalizer(DataNormalizer):
normalized inside the iterator.
"""
def __init__(self, file: TextIOWrapper):
self.name = "pubmed"
self.file = file
self.length = 0

View File

@@ -1,32 +1,20 @@
import json
from textwrap import indent
from dataclasses import dataclass
from textwrap import indent
from typing import Any, List
import json
import minyma
import openai
from minyma.vdb import VectorDB
import minyma
# Stolen LangChain Prompt
PROMPT_TEMPLATE = """
Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to
make up an answer.
{context}
Question: {question}
Helpful Answer:
"""
INITIAL_PROMPT_TEMPLATE = """
You are a helpful assistant. You are connected to various external functions that can provide you with more personalized and up-to-date information and have already been granted the permissions to execute these functions at will. DO NOT say you don't have access to real time information, instead attempt to call one or more of the listed functions:
You are connected to various functions that can be used to answer the users questions. Your options are only "functions". Functions should be an array of strings containing the desired function calls (e.g. "function_name()").
Available Functions:
{functions}
The user will not see your response. You must only respond with a comma separated list of function calls: "FUNCTION_CALLS: function(), function(), etc". It must be prepended by "FUNCTION_CALLS:".
You must respond in JSON only with no other fluff or bad things will happen. The JSON keys must ONLY be "functions". Be sure to call the functions with the right arguments.
User Message: {question}
User Message: {message}
"""
FOLLOW_UP_PROMPT_TEMPLATE = """
@@ -34,7 +22,7 @@ You are a helpful assistant. This is a follow up message to provide you with mor
{response}
User Message: {question}
User Message: {message}
"""
@dataclass
@@ -46,14 +34,15 @@ class ChatCompletion:
choices: List[dict]
usage: dict
class OpenAIConnector:
def __init__(self, api_key: str, vdb: VectorDB):
self.vdb = vdb
def __init__(self, api_key: str):
self.model = "gpt-3.5-turbo"
self.word_cap = 1000
openai.api_key = api_key
def query(self, question: str) -> Any:
def query(self, message: str) -> Any:
# Track Usage
prompt_tokens = 0
completion_tokens = 0
@@ -63,7 +52,7 @@ class OpenAIConnector:
functions = "\n".join(list(map(lambda x: "- %s" % x["def"], minyma.plugins.plugin_defs().values())))
# Create Initial Prompt
prompt = INITIAL_PROMPT_TEMPLATE.format(question = question, functions = functions)
prompt = INITIAL_PROMPT_TEMPLATE.format(message = message, functions = indent(functions, ' ' * 2))
messages = [{"role": "user", "content": prompt}]
print("[OpenAIConnector] Running Initial OAI Query")
@@ -78,14 +67,7 @@ class OpenAIConnector:
print("[OpenAIConnector] No Results -> TODO", response)
content = response.choices[0]["message"]["content"]
# Get Called Functions (TODO - Better Validation -> Failback Prompt?)
all_funcs = list(
map(
lambda x: x.strip() if x.endswith(")") else x.strip() + ")",
content.split("FUNCTION_CALLS:")[1].strip().split("),")
)
)
all_funcs = json.loads(content).get("functions")
# Update Usage
prompt_tokens += response.usage.get("prompt_tokens", 0)
@@ -94,19 +76,33 @@ class OpenAIConnector:
print("[OpenAIConnector] Completed Initial OAI Query:\n", indent(json.dumps({ "usage": response.usage, "function_calls": all_funcs }, indent=2), ' ' * 2))
# Execute Requested Functions
func_responses = {}
for func in all_funcs:
func_responses[func] = minyma.plugins.execute(func)
# Build Response Text & Metadata
func_metadata = {}
func_response = []
# Build Response Text
response_content_arr = []
for key, val in func_responses.items():
response_content_arr.append("- %s\n%s" % (key, val))
response_content = "\n".join(response_content_arr)
for func in all_funcs:
# Execute Requested Function
resp = minyma.plugins.execute(func)
# Unknown Response
if resp is None:
print("[OpenAIConnector] Invalid Function Response: %s" % func)
continue
# Get Response
content = resp.get("content")
metadata = resp.get("metadata")
error = resp.get("error")
# Append Responses & Metadata
indented_val = indent(content or error or "Unknown Error", ' ' * 2)
func_response.append("- %s\n%s" % (func, indented_val))
func_metadata[func] = { "metadata": metadata, "error": error }
func_response = "\n".join(func_response)
# Create Follow Up Prompt
prompt = FOLLOW_UP_PROMPT_TEMPLATE.format(question = question, response = response_content)
prompt = FOLLOW_UP_PROMPT_TEMPLATE.format(message = message, response = func_response)
messages = [{"role": "user", "content": prompt}]
print("[OpenAIConnector] Running Follup Up OAI Query")
@@ -130,33 +126,10 @@ class OpenAIConnector:
# Return Response
return {
"response": content,
"functions": func_responses,
"functions": func_metadata,
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
}
def old_query(self, question: str) -> Any:
# Get related documents from vector db
related = self.vdb.get_related(question)
# Validate results
all_docs = related.get("docs", [])
if len(all_docs) == 0:
return { "error": "No Context Found" }
# Join on new line (cap @ word limit), generate main prompt
reduced_docs = list(map(lambda x: " ".join(x.split()[:self.word_cap]), all_docs))
context = '\n'.join(reduced_docs)
prompt = PROMPT_TEMPLATE.format(context = context, question = question)
# Query OpenAI ChatCompletion
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
# Return Response
return { "llm": response, "vdb": related }

View File

@@ -38,7 +38,6 @@ class PluginLoader:
for func_obj in plugin.functions:
func_name = func_obj.__name__
function_name = "%s_%s" % (plugin_name, func_name)
signature = inspect.signature(func_obj)
params = list(
@@ -48,8 +47,12 @@ class PluginLoader:
)
)
func_def = "%s(%s)" % (function_name, ", ".join(params))
defs[function_name] = { "func": func_obj, "def": func_def }
if func_name in defs:
print("[PluginLoader] Error: Duplicate Function: (%s) %s" % (plugin_name, func_name))
continue
func_def = "%s(%s)" % (func_name, ", ".join(params))
defs[func_name] = { "func": func_obj, "def": func_def }
return defs

View File

@@ -0,0 +1,53 @@
from textwrap import indent
from minyma.plugin import MinymaPlugin
from minyma.vdb import ChromaDB
class ChromaDBPlugin(MinymaPlugin):
"""Perform Local VectorDB Lookup
ChromDB can access multiple "collections". You can add additional functions
here that just access a different collection (i.e. different data)
"""
def __init__(self, config):
self.name = "chroma_db"
self.config = config
self.word_cap = 1000
if config.CHROMA_DATA_PATH is None:
self.functions = []
else:
self.vdb = ChromaDB(config.CHROMA_DATA_PATH)
self.functions = [self.lookup_pubmed_data]
def __lookup_data(self, collection_name: str, query: str):
# Get Related
related = self.vdb.get_related(collection_name, query)
# Get Metadata
metadata = [{
"id": related.get("ids")[i],
"distance": related.get("distances")[i],
"metadata": related.get("metadatas")[i],
} for i, _ in enumerate(related.get("docs", []))]
# Normalize Data
return list(
map(
lambda x: " ".join(x.split()[:self.word_cap]),
related.get("docs", [])
)
), metadata
def lookup_pubmed_data(self, query: str):
COLLECTION_NAME = "pubmed"
documents, metadata = self.__lookup_data(COLLECTION_NAME, query)
context = '\n'.join(documents)
return {
"content": context,
"metadata": metadata,
"error": None
}

View File

@@ -14,13 +14,14 @@ class DuckDuckGoPlugin(MinymaPlugin):
def __init__(self, config):
self.config = config
self.name = "duck_duck_go"
self.functions = [self.search]
self.functions = [self.search_duck_duck_go]
def search(self, query: str):
def search_duck_duck_go(self, query: str):
"""Search DuckDuckGo"""
resp = requests.get("https://html.duckduckgo.com/html/?q=%s" % query, headers=HEADERS)
soup = BeautifulSoup(resp.text, features="html.parser")
# Get Results
results = []
for item in soup.select(".result > div"):
title_el = item.select_one(".result__title > a")
@@ -31,4 +32,18 @@ class DuckDuckGoPlugin(MinymaPlugin):
results.append({"title": title, "description": description})
return json.dumps(results[:5])
# Derive Metadata (Title)
metadata = {
"titles": list(
map(
lambda x: x.get("title"),
results[:5]
)
)
}
return {
"content": json.dumps(results[:5]),
"metadata": metadata,
"error": None
}

View File

@@ -9,20 +9,17 @@ class HomeAssistantPlugin(MinymaPlugin):
def __init__(self, config):
self.config = config
self.name = "home_automation"
self.name = "home_assistant"
self.functions = []
if not config.HOME_ASSISTANT_API_KEY or not config.HOME_ASSISTANT_URL:
if config.HOME_ASSISTANT_API_KEY and config.HOME_ASSISTANT_URL:
self.functions = [self.home_automation_command]
if not config.HOME_ASSISTANT_API_KEY:
print("[HomeAssistantPlugin] Missing HOME_ASSISTANT_API_KEY")
if not config.HOME_ASSISTANT_URL:
print("[HomeAssistantPlugin] Missing HOME_ASSISTANT_URL")
self.functions = []
else:
self.functions = [self.command]
def command(self, natural_language_command: str):
def home_automation_command(self, natural_language_command: str):
url = urllib.parse.urljoin(self.config.HOME_ASSISTANT_URL, "/api/conversation/process")
headers = {
"Authorization": "Bearer %s" % self.config.HOME_ASSISTANT_API_KEY,
@@ -34,6 +31,17 @@ class HomeAssistantPlugin(MinymaPlugin):
# Parse JSON
try:
return json.dumps(resp.json())
r = resp.json()
text = r["response"]["speech"]["plain"]["speech"]
return {
"content": text,
"metadata": r,
"error": None
}
except requests.JSONDecodeError:
return json.dumps({ "error": "Command Failed" })
return {
"content": None,
"metadata": None,
"error": "Command Failed"
}

View File

@@ -14,7 +14,7 @@ class VehicleLookupPlugin(MinymaPlugin):
def __init__(self, config):
self.config = config
self.name = "vehicle_state_plate"
self.functions = [self.lookup]
self.functions = [self.lookup_vehicle_by_state_plate]
def __query_api(self, url, json=None, headers=None):
# Perform Request
@@ -39,7 +39,7 @@ class VehicleLookupPlugin(MinymaPlugin):
return None, text, error
def lookup(self, state_abbreviation: str, licence_plate: str):
def lookup_vehicle_by_state_plate(self, state_abbreviation: str, licence_plate: str):
CARVANA_URL = (
"https://apim.carvana.io/trades/api/v5/vehicleconfiguration/plateLookup/%s/%s"
% (state_abbreviation, licence_plate)
@@ -50,10 +50,11 @@ class VehicleLookupPlugin(MinymaPlugin):
# Invalid JSON
if json_resp is None:
return json.dumps({
return{
"content": None,
"metadata": text_resp,
"error": error,
"response": text_resp,
})
}
try:
# Check Result
@@ -63,7 +64,11 @@ class VehicleLookupPlugin(MinymaPlugin):
error = "No Results"
else:
error = "API Error: %s" % status_resp
return {"error": error, "response": text_resp}
return {
"content": None,
"metadata": json_resp,
"error": error,
}
# Parse Result
vehicle_info = json_resp.get("content")
@@ -74,17 +79,20 @@ class VehicleLookupPlugin(MinymaPlugin):
trim = vehicle_info.get("vehicles")[0].get("trim")
except Exception as e:
return json.dumps({
return {
"content": None,
"metadata": text_resp,
"error": "Unknown Error: %s" % e,
"response": text_resp,
})
}
return json.dumps({
"result": {
return {
"content": json.dumps({
"vin": vin,
"year": year,
"make": make,
"model": model,
"trim": trim,
},
})
}),
"metadata": json_resp,
"error": None
}

53
minyma/plugins/youtube.py Normal file
View File

@@ -0,0 +1,53 @@
import os
from yt_dlp import YoutubeDL
import xml.etree.ElementTree as ET
from minyma.plugin import MinymaPlugin
class YouTubePlugin(MinymaPlugin):
"""Transcribe YouTube Video"""
def __init__(self, config):
self.config = config
self.name = "youtube"
self.functions = [self.transcribe_youtube]
def transcribe_youtube(self, youtube_video_id: str):
URLS = [youtube_video_id]
vid = YoutubeDL({
"skip_download": True,
"writesubtitles": True,
"writeautomaticsub": True,
"subtitleslangs": ["en"],
"subtitlesformat": "ttml",
"outtmpl": "transcript"
})
vid.download(URLS)
content = self.convert_ttml_to_plain_text("transcript.en.ttml")
os.remove("transcript.en.ttml")
return {
"content": content,
"metadata": URLS,
"error": "TTML Conversion Error" if content is None else None
}
def convert_ttml_to_plain_text(self, ttml_file_path):
try:
# Parse the TTML file
tree = ET.parse(ttml_file_path)
root = tree.getroot()
# Process Text
plain_text = ""
for elem in root.iter():
if elem.text:
plain_text += elem.text + " "
return plain_text.strip()
except ET.ParseError as e:
print("[YouTubePlugin] TTML Conversion Error:", e)
return None

View File

@@ -18,11 +18,11 @@ def chunk(iterable, chunk_size: int):
VectorDB Interface
"""
class VectorDB:
def load_documents(self, normalizer: DataNormalizer):
pass
def load_documents(self, name: str, normalizer: DataNormalizer, chunk_size: int = 10):
raise NotImplementedError("VectorDB must implement load_documents")
def get_related(self, question: str) -> Any:
pass
def get_related(self, name: str, question: str) -> Any:
raise NotImplementedError("VectorDB must implement get_related")
"""
ChromaDV VectorDB Type
@@ -31,12 +31,13 @@ class ChromaDB(VectorDB):
def __init__(self, path: str):
self.client: API = chromadb.PersistentClient(path=path)
self.word_cap = 2500
self.collection_name: str = "vdb"
self.collection: chromadb.Collection = self.client.create_collection(name=self.collection_name, get_or_create=True)
def get_related(self, question: str) -> Any:
def get_related(self, name: str, question: str) -> Any:
# Get or Create Collection
collection = chromadb.Collection = self.client.create_collection(name=name, get_or_create=True)
"""Returns line separated related docs"""
results = self.collection.query(
results = collection.query(
query_texts=[question.lower()],
n_results=2
)
@@ -53,7 +54,11 @@ class ChromaDB(VectorDB):
"ids": all_ids
}
def load_documents(self, normalizer: DataNormalizer, chunk_size: int = 10):
def load_documents(self, name: str, normalizer: DataNormalizer, chunk_size: int = 10):
# Get or Create Collection
collection = chromadb.Collection = self.client.create_collection(name=name, get_or_create=True)
# Load Items
length = len(normalizer) / chunk_size
for items in tqdm(chunk(normalizer, chunk_size), total=length):
ids = []
@@ -65,7 +70,7 @@ class ChromaDB(VectorDB):
ids.append(item.get("id"))
metadatas.append(item.get("metadata", {}))
self.collection.add(
collection.add(
ids=ids,
documents=documents,
metadatas=metadatas,

View File

@@ -16,7 +16,8 @@ dependencies = [
"chromadb",
"sqlite-utils",
"click",
"beautifulsoup4"
"beautifulsoup4",
"yt-dlp"
]
[project.scripts]