This file is a merged representation of the entire codebase, combined into a single document by Repomix.
The content has been processed where empty lines have been removed, content has been formatted for parsing in plain style, content has been compressed (code blocks are separated by ⋮---- delimiter), security check has been disabled.
================================================================
File Summary
================================================================
Purpose:
--------
This file contains a packed representation of the entire repository's contents.
It is designed to be easily consumable by AI systems for analysis, code review,
or other automated processes.
File Format:
------------
The content is organized as follows:
1. This summary section
2. Repository information
3. Directory structure
4. Repository files (if enabled)
5. Multiple file entries, each consisting of:
a. A separator line (================)
b. The file path (File: path/to/file)
c. Another separator line
d. The full contents of the file
e. A blank line
Usage Guidelines:
-----------------
- This file should be treated as read-only. Any changes should be made to the
original repository files, not this packed version.
- When processing this file, use the file path to distinguish
between different files in the repository.
- Be aware that this file may contain sensitive information. Handle it with
the same level of security as you would the original repository.
Notes:
------
- Some files may have been excluded based on .gitignore rules and Repomix's configuration
- Binary files are not included in this packed representation. Please refer to the Repository Structure section for a complete list of file paths, including binary files
- Files matching patterns in .gitignore are excluded
- Files matching default ignore patterns are excluded
- Empty lines have been removed from all files
- Content has been formatted for parsing in plain style
- Content has been compressed - code blocks are separated by ⋮---- delimiter
- Security check has been disabled - content may contain sensitive information
- Files are sorted by Git change count (files with more changes are at the bottom)
================================================================
Directory Structure
================================================================
.github/
actions/
uv_setup/
action.yml
workflows/
_test.yml
ci.yml
langconnect/
api/
__init__.py
collections.py
documents.py
database/
collections.py
connection.py
models/
__init__.py
collection.py
document.py
services/
__init__.py
document_processor.py
__init__.py
__main__.py
auth.py
config.py
server.py
tests/
unit_tests/
conftest.py
fixtures.py
test_collections_api.py
test_documents_api.py
test_imports.py
.dockerignore
.env.example
.gitignore
docker-compose.test.yml
docker-compose.yml
Dockerfile
INSTRUCTIONS.md
LICENSE
Makefile
pyproject.toml
README.md
================================================================
Files
================================================================
================
File: .github/actions/uv_setup/action.yml
================
# TODO: https://docs.astral.sh/uv/guides/integration/github/#caching
name: uv-install
description: Set up Python and uv
inputs:
python-version:
description: Python version, supporting MAJOR.MINOR only
required: true
env:
UV_VERSION: "0.7.3"
runs:
using: composite
steps:
- name: Install uv and set the python version
uses: astral-sh/setup-uv@v5
with:
version: ${{ env.UV_VERSION }}
python-version: ${{ inputs.python-version }}
================
File: .github/workflows/_test.yml
================
name: test
on:
workflow_call:
inputs:
working-directory:
required: true
type: string
description: "From which folder this pipeline executes"
python-version:
required: true
type: string
description: "Python version to use"
env:
UV_FROZEN: "true"
UV_NO_SYNC: "true"
jobs:
build:
services:
postgres:
# ensure postgres version this stays in sync with prod database
# and with postgres version used in docker compose
# Testing with postgres that has the pg vector extension
image: pgvector/pgvector:pg16
env:
# optional (defaults to `postgres`)
POSTGRES_DB: langchain_test
# required
POSTGRES_PASSWORD: langchain
# optional (defaults to `5432`)
POSTGRES_PORT: 5432
# optional (defaults to `postgres`)
POSTGRES_USER: langchain
ports:
# maps tcp port 5432 on service container to the host
- 5432:5432
# set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 3s
--health-timeout 5s
--health-retries 10
defaults:
run:
working-directory: ${{ inputs.working-directory }}
runs-on: ubuntu-latest
timeout-minutes: 20
name: "make test #${{ inputs.python-version }}"
steps:
- name: Test database connection
run: |
# Set up postgresql-client
sudo apt-get install -y postgresql-client
# Test psql connection
psql -h localhost -p 5432 -U langchain -d langchain_test -c "SELECT 1;"
env:
# postgress password is required; alternatively, you can run:
# `PGPASSWORD=postgres_password psql ...`
PGPASSWORD: langchain
- uses: actions/checkout@v4
- name: Set up Python ${{ inputs.python-version }} + uv
uses: "./.github/actions/uv_setup"
id: setup-python
with:
python-version: ${{ inputs.python-version }}
- name: Install dependencies
shell: bash
run: uv sync --group dev
- name: Run core tests
shell: bash
run: |
make test
- name: Run Integration tests
# Only run this is the working-directory is server
if: ${{ inputs.working-directory == './libs/server' }}
shell: bash
run: |
make test_integration
================
File: .github/workflows/ci.yml
================
# .github/workflows/ci.yml
name: CI Checks
on:
pull_request:
push:
branches: [ main ]
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
env:
UV_FROZEN: "true"
UV_NO_SYNC: "true"
jobs:
format:
# Delete me after updating repo configuration
runs-on: ubuntu-latest
steps:
- name: No-op
run: echo "1"
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11' # Or specify your project's Python version
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ruff
# If you have other dependencies needed for ruff (e.g., plugins via pyproject.toml)
# install them here, e.g., pip install -r requirements.txt or pip install .
- name: Check linting
run: make lint
test:
strategy:
matrix:
python-version:
- "3.11"
uses:
./.github/workflows/_test.yml
with:
working-directory: "."
python-version: ${{ matrix.python-version }}
secrets: inherit
================
File: langconnect/api/__init__.py
================
__all__ = ["collections_router", "documents_router"]
================
File: langconnect/api/collections.py
================
router = APIRouter(prefix="/collections", tags=["collections"])
⋮----
"""Creates a new PGVector collection by name with optional metadata."""
collection_info = await CollectionsManager(user.identity).create(
⋮----
@router.get("", response_model=list[CollectionResponse])
async def collections_list(user: Annotated[AuthenticatedUser, Depends(resolve_user)])
⋮----
"""Lists all available PGVector collections (name and UUID)."""
⋮----
"""Retrieves details (name and UUID) of a specific PGVector collection."""
collection = await CollectionsManager(user.identity).get(str(collection_id))
⋮----
"""Deletes a specific PGVector collection by name."""
⋮----
"""Updates a specific PGVector collection's name and/or metadata."""
updated_collection = await CollectionsManager(user.identity).update(
================
File: langconnect/api/documents.py
================
# Create a TypeAdapter that enforces “list of dict”
_metadata_adapter = TypeAdapter(list[dict[str, Any]])
logger = logging.getLogger(__name__)
router = APIRouter(tags=["documents"])
⋮----
"""Processes and indexes (adds) new document files with optional metadata."""
# If no metadata JSON is provided, fill with None
⋮----
metadatas: list[dict] | list[None] = [None] * len(files)
⋮----
# This will both parse the JSON and check the Python types
# (i.e. that it's a list, and every item is a dict)
metadatas = _metadata_adapter.validate_json(metadatas_json)
⋮----
# Pydantic errors include exactly what went wrong
⋮----
# Now just check that the list length matches
⋮----
docs_to_index: list[Document] = []
processed_files_count = 0
failed_files = []
# Pair files with their corresponding metadata
⋮----
# Pass metadata to process_document
langchain_docs = await process_document(file, metadata=metadata)
⋮----
# Decide if this constitutes a failure
# failed_files.append(file.filename)
⋮----
# Log the error and the file that caused it
⋮----
# Decide on behavior: continue processing others or fail fast?
# For now, let's collect failures and report them, but continue processing.
# If after processing all files, none yielded documents, raise error
⋮----
error_detail = "Failed to process any documents from the provided files."
⋮----
# If some files failed but others succeeded, proceed with adding successful ones
# but maybe inform the user about the failures.
⋮----
collection = Collection(
added_ids = await collection.upsert(docs_to_index)
⋮----
# This might indicate a problem with the vector store itself
⋮----
# Construct response message
success_message = (
response_data = {
⋮----
# Consider if partial success should change the overall status/message
⋮----
# Reraise HTTPExceptions from add_documents_to_vectorstore or previous checks
⋮----
# Handle exceptions during the vector store addition process
⋮----
"""Lists documents within a specific collection."""
⋮----
"""Deletes a specific document from a collection by its ID."""
⋮----
# TODO(Eugene): Deletion logic does not look correct.
# Should I be deleting by ID or file ID?
success = await collection.delete(file_id=document_id)
⋮----
"""Search for documents within a specific collection."""
⋮----
results = await collection.search(
================
File: langconnect/database/collections.py
================
"""Module defines CollectionManager and Collection classes.
1. CollectionManager: for managing collections of documents in a database.
2. Collection: for managing the contents of a specific collection.
The current implementations are based on langchain-postgres PGVector class.
Replace with your own implementation or favorite vectorstore if needed.
"""
⋮----
logger = logging.getLogger(__name__)
class CollectionDetails(TypedDict)
⋮----
"""TypedDict for collection details."""
uuid: str
name: str
metadata: dict[str, Any]
# Temporary field used internally to workaround an issue with PGVector
table_id: NotRequired[str]
class CollectionsManager
⋮----
"""Use to create, delete, update, and list document collections."""
def __init__(self, user_id: str) -> None
⋮----
"""Initialize the collection manager with a user ID."""
⋮----
@staticmethod
async def setup() -> None
⋮----
"""Set up method should run any necessary initialization code.
For example, it could run SQL migrations to create the necessary tables.
"""
⋮----
"""List all collections owned by the given user, ordered by logical name."""
⋮----
records = await conn.fetch(
result: list[CollectionDetails] = []
⋮----
metadata = json.loads(r["cmetadata"])
name = metadata.pop("name", "Unnamed")
⋮----
"""Fetch a single collection by UUID, ensuring the user owns it."""
⋮----
rec = await conn.fetchrow(
⋮----
metadata = json.loads(rec["cmetadata"])
⋮----
"""Create a new collection.
Args:
collection_name: The name of the new collection.
metadata: Optional metadata for the collection.
Returns:
Details of the created collection or None if creation failed.
"""
# check for existing name
metadata = metadata.copy() if metadata else {}
⋮----
# For now just assign a random table id
table_id = str(uuid.uuid4())
# triggers PGVector to create both the vectorstore and DB entry
⋮----
# Fetch the newly created table.
⋮----
name = metadata.pop("name")
⋮----
"""Update collection metadata.
Four cases:
1) metadata only → merge in metadata, keep old JSON->'name'
2) metadata + new name → merge metadata (including new 'name')
3) new name only → jsonb_set the 'name' key
4) neither → no-op, just fetch & return
"""
# Case 4: no-op
⋮----
# Case 1 & 2: metadata supplied (with or without new name)
⋮----
# merge in owner_id + optional new name
merged = metadata.copy()
⋮----
# pull existing friendly name so we don't lose it
existing = await self.get(collection_id)
⋮----
metadata_json = json.dumps(merged)
⋮----
# Case 3: name only
else: # metadata is None but name is not None
⋮----
full_meta = json.loads(rec["cmetadata"])
friendly_name = full_meta.pop("name", "Unnamed")
⋮----
"""Delete a collection by UUID.
Returns number of rows deleted (1).
Raises 404 if no such collection.
"""
⋮----
result = await conn.execute(
⋮----
class Collection
⋮----
"""A collection of documents.
Use to add, delete, list, and search documents to a given collection.
"""
def __init__(self, collection_id: str, user_id: str) -> None
⋮----
"""Initialize the collection by collection ID."""
⋮----
async def _get_details_or_raise(self) -> dict[str, Any]
⋮----
"""Get collection details if it exists, otherwise raise an error."""
details = await CollectionsManager(self.user_id).get(self.collection_id)
⋮----
async def upsert(self, documents: list[Document]) -> list[str]
⋮----
"""Add one or more documents to the collection."""
details = await self._get_details_or_raise()
store = get_vectorstore(collection_name=details["table_id"])
added_ids = store.add_documents(documents)
⋮----
"""Delete embeddings by file id.
A file id identifies the original file from which the chunks were generated.
"""
⋮----
delete_sql = """
# Params: collection UUID, user ID, file ID
⋮----
# result is like "DELETE 3"
deleted_count = int(result.split()[-1])
⋮----
# For now if deleted count is 0, let's verify that the collection exists.
⋮----
async def list(self, *, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]
⋮----
"""List one representative chunk per file (unique file_id) in this collection."""
⋮----
rows = await conn.fetch(
docs: list[dict[str, Any]] = []
⋮----
metadata = json.loads(r["cmetadata"]) if r["cmetadata"] else {}
⋮----
# For now, if no documents, let's check that the collection exists.
# It may make sense to consider this a 200 OK with empty list.
# And make sure its user responsibility to check that the collection
# exists.
⋮----
async def get(self, document_id: str) -> dict[str, Any]
⋮----
"""Fetch a single chunk by its UUID, verifying collection ownership."""
⋮----
row = await conn.fetchrow(
⋮----
metadata = json.loads(row["cmetadata"]) if row["cmetadata"] else {}
⋮----
"""Run a semantic similarity search in the vector store.
Note: offset is applied client-side after retrieval.
"""
⋮----
results = store.similarity_search_with_score(query, k=limit)
================
File: langconnect/database/connection.py
================
logger = logging.getLogger(__name__)
_pool: asyncpg.Pool | None = None
async def get_db_pool() -> asyncpg.Pool
⋮----
"""Get the pg connection pool."""
⋮----
# Use parsed components for asyncpg connection
_pool = await asyncpg.create_pool(
⋮----
async def close_db_pool()
⋮----
"""Close the pg connection pool."""
⋮----
_pool = None
⋮----
@asynccontextmanager
async def get_db_connection() -> AsyncGenerator[asyncpg.Connection, None]
⋮----
"""Get a connection from the pool."""
pool = await get_db_pool()
⋮----
"""Creates and returns a sync SQLAlchemy engine for PostgreSQL."""
connection_string = f"postgresql+psycopg://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(connection_string)
⋮----
DBConnection = Union[sqlalchemy.engine.Engine, str]
⋮----
"""Initializes and returns a PGVector store for a specific collection,
using an existing engine or creating one from connection parameters.
"""
⋮----
engine = get_vectorstore_engine()
store = PGVector(
================
File: langconnect/models/__init__.py
================
__all__ = [
================
File: langconnect/models/collection.py
================
# =====================
# Collection Schemas
⋮----
class CollectionCreate(BaseModel)
⋮----
"""Schema for creating a new collection."""
name: str = Field(..., description="The unique name of the collection.")
metadata: dict[str, Any] = Field(
class CollectionUpdate(BaseModel)
⋮----
"""Schema for updating an existing collection."""
name: str | None = Field(None, description="New name for the collection.")
metadata: dict[str, Any] | None = Field(
class CollectionResponse(BaseModel)
⋮----
"""Schema for representing a collection from PGVector."""
# PGVector table has uuid (id), name (str), and cmetadata (JSONB)
# We get these from list/get db functions
uuid: str = Field(
name: str = Field(..., description="The name of the collection.")
⋮----
class Config
⋮----
# Allows creating model from dict like
# {'uuid': '...', 'name': '...', 'metadata': {...}}
from_attributes = True
⋮----
# Document Schemas
⋮----
class DocumentBase(BaseModel)
⋮----
page_content: str
metadata: dict[str, Any] = Field(default_factory=dict)
class DocumentCreate(DocumentBase)
⋮----
collection_id: str
embedding: list[float] | None = (
⋮----
None # Embedding can be added during creation or later
⋮----
class DocumentUpdate(BaseModel)
⋮----
page_content: str | None = None
metadata: dict[str, Any] | None = None
embedding: list[float] | None = None
class DocumentResponse(DocumentBase)
⋮----
id: str
⋮----
embedding: list[float] | None = None # Represent embedding as list of floats
created_at: datetime.datetime
updated_at: datetime.datetime
⋮----
orm_mode = True
from_attributes = True # Pydantic v2 way
================
File: langconnect/models/document.py
================
class DocumentCreate(BaseModel)
⋮----
content: str | None = None
metadata: dict[str, Any] | None = None
class DocumentUpdate(BaseModel)
class DocumentResponse(BaseModel)
⋮----
id: str
collection_id: str
⋮----
created_at: str | None = None
updated_at: str | None = None
class SearchQuery(BaseModel)
⋮----
query: str
limit: int | None = 10
filter: dict[str, Any] | None = None
class SearchResult(BaseModel)
⋮----
page_content: str
⋮----
score: float
================
File: langconnect/services/__init__.py
================
__all__ = ["SUPPORTED_MIMETYPES", "process_document"]
================
File: langconnect/services/document_processor.py
================
LOGGER = logging.getLogger(__name__)
# Document Parser Configuration
HANDLERS = {
SUPPORTED_MIMETYPES = sorted(HANDLERS.keys())
MIMETYPE_BASED_PARSER = MimeTypeBasedParser(
# Text Splitter
TEXT_SPLITTER = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
⋮----
"""Process an uploaded file into LangChain documents."""
# Generate a unique ID for this file processing instance
file_id = uuid.uuid4()
contents = await file.read()
blob = Blob(data=contents, mimetype=file.content_type or "text/plain")
docs = MIMETYPE_BASED_PARSER.parse(blob)
# Add provided metadata to each document
⋮----
# Ensure metadata attribute exists and is a dict
⋮----
# Update with provided metadata, preserving existing keys if not overridden
⋮----
# Split documents
split_docs = TEXT_SPLITTER.split_documents(docs)
# Add the generated file_id to all split documents' metadata
⋮----
split_doc.metadata = {} # Initialize if it doesn't exist
⋮----
) # Store as string for compatibility
================
File: langconnect/__init__.py
================
"""LangConnect: A RAG service using FastAPI and LangChain."""
⋮----
__version__ = "0.0.1"
================
File: langconnect/__main__.py
================
================
File: langconnect/auth.py
================
"""Auth to resolve user object."""
⋮----
security = HTTPBearer()
class AuthenticatedUser(BaseUser)
⋮----
"""An authenticated user following the Starlette authentication model."""
def __init__(self, user_id: str, display_name: str) -> None
⋮----
"""Initialize the AuthenticatedUser.
Args:
user_id: Unique identifier for the user.
display_name: Display name for the user.
"""
⋮----
@property
def is_authenticated(self) -> bool
⋮----
"""Return True if the user is authenticated."""
⋮----
@property
def display_name(self) -> str
⋮----
"""Return the display name of the user."""
⋮----
@property
def identity(self) -> str
⋮----
"""Return the identity of the user. This is a unique identifier."""
⋮----
def get_current_user(authorization: str) -> User
⋮----
"""Authenticate a user by validating their JWT token against Supabase.
This function verifies the provided JWT token by making a request to Supabase.
It requires the SUPABASE_URL and SUPABASE_KEY environment variables to be
properly configured.
Args:
authorization: JWT token string to validate
Returns:
User: A Supabase User object containing the authenticated user's information
Raises:
HTTPException: With status code 500 if Supabase configuration is missing
HTTPException: With status code 401 if token is invalid or authentication fails
"""
supabase = create_client(config.SUPABASE_URL, config.SUPABASE_KEY)
response = supabase.auth.get_user(authorization)
user = response.user
⋮----
"""Resolve user from the credentials."""
⋮----
user = get_current_user(credentials.credentials)
================
File: langconnect/config.py
================
env = Config()
IS_TESTING = env("IS_TESTING", cast=str, default="").lower() == "true"
⋮----
SUPABASE_URL = ""
SUPABASE_KEY = ""
⋮----
SUPABASE_URL = env("SUPABASE_URL", cast=str, default=undefined)
SUPABASE_KEY = env("SUPABASE_KEY", cast=str, default=undefined)
def get_embeddings() -> Embeddings
⋮----
"""Get the embeddings instance based on the environment."""
⋮----
DEFAULT_EMBEDDINGS = get_embeddings()
DEFAULT_COLLECTION_NAME = "default_collection"
# Database configuration
POSTGRES_HOST = env("POSTGRES_HOST", cast=str, default="localhost")
POSTGRES_PORT = env("POSTGRES_PORT", cast=int, default="5432")
POSTGRES_USER = env("POSTGRES_USER", cast=str, default="langchain")
POSTGRES_PASSWORD = env("POSTGRES_PASSWORD", cast=str, default="langchain")
POSTGRES_DB = env("POSTGRES_DB", cast=str, default="langchain_test")
# Read allowed origins from environment variable
ALLOW_ORIGINS_JSON = env("ALLOW_ORIGINS", cast=str, default="")
⋮----
ALLOWED_ORIGINS = json.loads(ALLOW_ORIGINS_JSON.strip())
⋮----
ALLOWED_ORIGINS = "http://localhost:3000"
================
File: langconnect/server.py
================
# Configure logging
⋮----
logger = logging.getLogger(__name__)
# Initialize FastAPI app
⋮----
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]
⋮----
"""Lifespan context manager for FastAPI application."""
⋮----
APP = FastAPI(
# Add CORS middleware
⋮----
# Include API routers
⋮----
@APP.get("/health")
async def health_check() -> dict
⋮----
"""Health check endpoint."""
================
File: tests/unit_tests/conftest.py
================
@pytest.fixture(scope="session")
def event_loop()
⋮----
"""Create a single asyncio event loop for the entire test session,
and only close it once at the very end.
This overrides pytest-asyncio's default event_loop fixture.
"""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
================
File: tests/unit_tests/fixtures.py
================
def reset_db() -> None
⋮----
"""Hacky code to initialize the database. This needs to be fixed."""
⋮----
vectorstore = get_vectorstore()
# Drop table
⋮----
# Re-create
⋮----
@asynccontextmanager
async def get_async_test_client() -> AsyncGenerator[AsyncClient, None]
⋮----
"""Get an async client."""
url = "http://localhost:9999"
transport = ASGITransport(
⋮----
async_client = AsyncClient(base_url=url, transport=transport)
================
File: tests/unit_tests/test_collections_api.py
================
USER_1_HEADERS = {
USER_2_HEADERS = {
NO_SUCH_USER_HEADERS = {
async def test_health() -> None
⋮----
"""Test the health check endpoint."""
⋮----
response = await client.get("/health")
⋮----
async def test_create_and_get_collection() -> None
⋮----
"""Test creating and retrieving a collection."""
⋮----
payload = {"name": "test_collection", "metadata": {"purpose": "unit-test"}}
response = await client.post(
⋮----
data = response.json()
⋮----
# Get collection by ID
get_response = await client.get(
⋮----
# Test without metadata
payload_no_metadata = {"name": "test_collection_no_metadata"}
response_no_metadata = await client.post(
⋮----
data_no_metadata = response_no_metadata.json()
⋮----
async def test_create_and_list_collection() -> None
⋮----
"""Test creating and listing a collection."""
⋮----
# List collections
list_response = await client.get("/collections", headers=USER_1_HEADERS)
⋮----
collections = list_response.json()
⋮----
async def test_create_collections_with_identical_names() -> None
⋮----
"""Test that collections with identical names can be created."""
⋮----
payload = {"name": "dup_collection", "metadata": {"foo": "bar"}}
# first create
r1 = await client.post("/collections", json=payload, headers=USER_1_HEADERS)
⋮----
# second create with same name
r2 = await client.post("/collections", json=payload, headers=USER_1_HEADERS)
⋮----
async def test_create_collection_requires_auth() -> None
⋮----
"""POST /collections without a valid token should be 401."""
⋮----
payload = {"name": "no_auth", "metadata": {}}
r = await client.post("/collections", json=payload)
⋮----
r2 = await client.post(
⋮----
async def test_get_nonexistent_collection() -> None
⋮----
"""GET a collection that doesn't exist should be 404."""
⋮----
r = await client.get("/collections/nonexistent", headers=USER_1_HEADERS)
# Not a UUID, so should be 422
⋮----
r = await client.get(
⋮----
async def test_delete_collection_and_nonexistent() -> None
⋮----
"""DELETE removes an existing collection and returns 404 on missing."""
⋮----
# create first
payload = {"name": "to_delete", "metadata": {"foo": "bar"}}
⋮----
# Get the UUID first
get_collection = await client.get("/collections", headers=USER_1_HEADERS)
⋮----
collections = get_collection.json()
collection_id = next(
⋮----
# delete it by ID
r2 = await client.delete(
⋮----
# Try to get it again by ID
r3 = await client.get(f"/collections/{collection_id}", headers=USER_1_HEADERS)
⋮----
# Deletion is idempotent
r4 = await client.delete(
⋮----
async def test_patch_collection() -> None
⋮----
"""PATCH should update metadata properly."""
⋮----
# create a collection
payload = {"name": "colA", "metadata": {"a": 1}}
r = await client.post("/collections", json=payload, headers=USER_1_HEADERS)
⋮----
# Get the UUID for colA
⋮----
# update metadata using the UUID
r2 = await client.patch(
⋮----
async def test_update_collection_name_and_metadata() -> None
⋮----
"""PATCH should rename and/or update metadata properly."""
⋮----
# create two collections
⋮----
col_a_id = next((c["uuid"] for c in collections if c["name"] == "colA"), None)
⋮----
# try renaming colA to colB (conflict)
no_conflict = await client.patch(
⋮----
"uuid": col_a_id, # The ID should not change
⋮----
# rename colA to colC with new metadata (using the UUID we got earlier)
update = await client.patch(
⋮----
body = update.json()
⋮----
# ensure we can get by the ID
get_by_id = await client.get(f"/collections/{col_a_id}", headers=USER_1_HEADERS)
⋮----
# the ID should remain the same even though name changed
⋮----
# update metadata only on colC using the same ID
meta_update = await client.patch(
⋮----
async def test_update_nonexistent_collection() -> None
⋮----
"""PATCH a missing collection should return 404."""
⋮----
r = await client.patch(
⋮----
async def test_list_empty_and_multiple_collections() -> None
⋮----
"""Listing when empty and after multiple creates."""
⋮----
# ensure database is empty
empty = await client.get(
⋮----
# create several
names = ["one", "two", "three"]
⋮----
r = await client.post(
⋮----
listed = await client.get("/collections", headers=USER_1_HEADERS)
⋮----
got = [c["name"] for c in listed.json()]
⋮----
# Check ownership of collections.
async def test_ownership() -> None
⋮----
"""Try accessing and deleting collections owned by user 1 using user 2."""
⋮----
# create a collection as user 1
payload = {"name": "owned_by_user1", "metadata": {}}
⋮----
# Get the UUID of the collection
get_response = await client.get("/collections", headers=USER_1_HEADERS)
⋮----
collections = get_response.json()
⋮----
# user 2 tries to get it by ID
r2 = await client.get(f"/collections/{collection_id}", headers=USER_2_HEADERS)
⋮----
# Always ack with 204 for idempotency
r3 = await client.delete(
⋮----
# Try listing collections as user 2
r4 = await client.get("/collections", headers=USER_2_HEADERS)
⋮----
# Try patching the collection as user 2
r4 = await client.patch(
⋮----
# user 1 can delete it
r5 = await client.delete(
================
File: tests/unit_tests/test_documents_api.py
================
USER_1_HEADERS = {
USER_2_HEADERS = {
NO_SUCH_USER_HEADERS = {
async def test_documents_create_and_list_and_delete_and_search() -> None
⋮----
"""Test creating, listing, deleting, and searching documents."""
⋮----
# Create a collection for documents
collection_name = "docs_test_col"
col_payload = {"name": collection_name, "metadata": {"purpose": "doc-test"}}
create_col = await client.post(
⋮----
collection_data = create_col.json()
collection_id = collection_data["uuid"]
# Prepare a simple text file
file_content = b"Hello world. This is a test document."
files = [("files", ("test.txt", file_content, "text/plain"))]
# Create documents without metadata
resp = await client.post(
⋮----
data = resp.json()
⋮----
# added_chunk_ids should be a non-empty list of UUIDs
ids = data["added_chunk_ids"]
⋮----
# Validate each is a UUID string
⋮----
# List documents in collection, default limit 10
list_resp = await client.get(
⋮----
docs = list_resp.json()
⋮----
# Each doc should have id and text fields
⋮----
# Search documents with a valid query
search_payload = {"query": "test document", "limit": 5}
search_resp = await client.post(
⋮----
results = search_resp.json()
⋮----
# Each result should have id, score, text
⋮----
# Delete a document
doc_id = docs[0]["id"]
del_resp = await client.delete(
⋮----
# Delete non-existent document gracefully
del_resp2 = await client.delete(
# Should still return success True or 200/204; here assume 200
⋮----
async def test_documents_create_with_invalid_metadata_json() -> None
⋮----
"""Test creating documents with invalid metadata JSON."""
⋮----
# Create a collection
col_name = "meta_test_col"
collection_response = await client.post(
⋮----
collection_data = collection_response.json()
⋮----
# Prepare file
file_content = b"Sample"
files = [("files", ("a.txt", file_content, "text/plain"))]
# Provide invalid JSON
⋮----
async def test_documents_search_empty_query() -> None
⋮----
"""Test searching documents with an empty query."""
⋮----
# Create a collection for search test
col_name = "search_test_col"
⋮----
# Attempt search with empty query
⋮----
async def test_documents_in_nonexistent_collection() -> None
⋮----
"""Test operations on documents in a non-existent collection."""
⋮----
# Try listing documents in missing collection
no_such_collection = "12345678-1234-5678-1234-567812345678"
response = await client.get(
⋮----
# Try uploading to a non existent collection
file_content = b"X"
files = [("files", ("x.txt", file_content, "text/plain"))]
upload_resp = await client.post(
⋮----
# Try deleting from missing collection/document
⋮----
# Try search in missing collection
⋮----
# Not found or 404
⋮----
async def test_documents_create_with_valid_text_file_and_metadata() -> None
⋮----
"""Test creating documents with a valid text file and metadata."""
⋮----
# Create a collection first
collection_name = "doc_test_with_metadata"
⋮----
# Prepare a text file with content
file_content = b"This is a test document with metadata."
files = [("files", ("metadata_test.txt", file_content, "text/plain"))]
# Prepare metadata as JSON
metadata = [{"source": "test", "author": "user1", "importance": "high"}]
metadata_json = json.dumps(metadata)
# Create document with metadata
response = await client.post(
⋮----
data = response.json()
⋮----
# Verify each ID is a valid UUID
⋮----
UUID(chunk_id) # This will raise an exception if invalid
# Verify document was added by listing documents
list_response = await client.get(
⋮----
documents = list_response.json()
⋮----
# Verify metadata was attached
doc = documents[0]
⋮----
# The file_id will be a new UUID, so we can't check the exact value
async def test_documents_create_with_valid_text_file_without_metadata() -> None
⋮----
"""Test creating documents with a valid text file without metadata."""
⋮----
collection_name = "doc_test_without_metadata"
⋮----
file_content = b"This is a test document without metadata."
files = [("files", ("no_metadata_test.txt", file_content, "text/plain"))]
# Create document without metadata
⋮----
# Verify content is in the document
⋮----
async def test_documents_create_with_empty_file() -> None
⋮----
"""Test creating documents with an empty file."""
⋮----
collection_name = "doc_test_empty_file"
⋮----
# Prepare an empty file
file_content = b""
files = [("files", ("empty.txt", file_content, "text/plain"))]
# Create document with empty file
⋮----
# Empty files should be rejected with 400 Bad Request
⋮----
async def test_documents_create_with_invalid_metadata_format() -> None
⋮----
"""Test creating documents with invalid metadata format."""
⋮----
collection_name = "doc_test_invalid_metadata"
⋮----
file_content = b"This is a test document with invalid metadata."
files = [("files", ("invalid_metadata.txt", file_content, "text/plain"))]
# Invalid JSON format for metadata
invalid_metadata = "not a json"
# Create document with invalid metadata
⋮----
# Test with metadata that's not a list
invalid_metadata_not_list = json.dumps({"key": "value"})
⋮----
async def test_documents_create_with_non_existent_collection() -> None
⋮----
"""Test creating documents in a non-existent collection."""
⋮----
file_content = b"This is a test document for a non-existent collection."
files = [("files", ("nonexistent.txt", file_content, "text/plain"))]
# Try to create document in a non-existent collection
uuid = "12345678-1234-5678-1234-567812345678"
⋮----
async def test_documents_create_with_multiple_files()
⋮----
"""Test creating documents with multiple files."""
⋮----
collection_name = "doc_test_multiple_files"
⋮----
# Prepare multiple files
files = [
# Create document with multiple files
⋮----
# We should have at least 2 chunks (one for each file)
⋮----
# Verify documents were added by listing documents
⋮----
# The number of documents returned might not match the number of files
# exactly, as documents are chunked and only one chunk per file_id is returned
⋮----
async def test_documents_create_with_mismatched_metadata()
⋮----
"""Test creating documents with metadata count not matching files count."""
⋮----
collection_name = "doc_test_mismatched_metadata"
⋮----
# Metadata with only one entry for two files
metadata = [{"source": "test"}]
⋮----
# Create document with mismatched metadata
⋮----
async def test_documents_create_ownership_validation()
⋮----
"""Test creating documents with a different user than the collection owner."""
⋮----
# Create a collection as USER_1
collection_name = "doc_test_ownership"
⋮----
# Prepare a file
file_content = b"This is a test document for ownership validation."
files = [("files", ("ownership.txt", file_content, "text/plain"))]
# Try to create document as USER_2
⋮----
# Should return 404 as USER_2 can't see USER_1's collection
================
File: tests/unit_tests/test_imports.py
================
"""Placeholder unit tests."""
def test_import_app() -> None
⋮----
"""Sample test that does not do much."""
from langconnect.server import APP # noqa: F401
================
File: .dockerignore
================
# Git
.git
.gitignore
.github
# Docker
.dockerignore
Dockerfile
docker-compose.yml
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
.env
.venv
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
================
File: .env.example
================
# API key for the embeddings model. Defaults to OpenAI embeddings
OPENAI_API_KEY=
# PostgreSQL configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
POSTGRES_DB=langconnect_dev
# CORS configuration. Must be a JSON array of strings
ALLOW_ORIGINS=["http://localhost:3000"]
# For authentication
SUPABASE_URL=
# This must be the service role key
SUPABASE_KEY=
================
File: .gitignore
================
.vs/
.vscode/
.idea/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Swp files
*.swp
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Google GitHub Actions credentials files created by:
# https://github.com/google-github-actions/auth
#
# That action recommends adding this gitignore to prevent accidentally committing keys.
gha-creds-*.json
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
.codspeed/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
docs/docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
notebooks/
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv*
venv*
env/
ENV/
env.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.mypy_cache_test/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# macOS display setting files
.DS_Store
# Wandb directory
wandb/
# asdf tool versions
.tool-versions
/.ruff_cache/
node_modules
_dist
prof
virtualenv/
================
File: docker-compose.test.yml
================
services:
postgres_test:
image: pgvector/pgvector:pg16
container_name: langconnect-postgres-test
ports:
- "5432:5432"
environment:
# TODO: Change env variables on CI and in this docker-compose file
# to match the dev docker compose file (uses standard postgres/postgres)
POSTGRES_USER: langchain
POSTGRES_PASSWORD: langchain
POSTGRES_DB: langchain_test
volumes:
- postgres_test_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres"]
interval: 5s
timeout: 5s
retries: 5
volumes:
postgres_test_data:
================
File: docker-compose.yml
================
services:
postgres:
image: pgvector/pgvector:pg16
container_name: langconnect-postgres
restart: always
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres"]
interval: 5s
timeout: 5s
retries: 5
api:
build:
context: .
dockerfile: Dockerfile
container_name: langconnect-api
restart: always
depends_on:
postgres:
condition: service_healthy
ports:
- "8080:8080"
env_file:
- .env
environment:
POSTGRES_HOST: postgres
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- ./langconnect:/app/langconnect
volumes:
postgres_data:
================
File: Dockerfile
================
FROM python:3.11-slim
WORKDIR /app
# Copy requirements first for better layer caching
COPY pyproject.toml uv.lock ./
# Copy application code (needs to be done before pip install .[dev])
COPY . .
# Install build dependencies and runtime dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc python3-dev libpq-dev && \
pip install --no-cache-dir pip -U && \
pip install --no-cache-dir hatch && \
pip install --no-cache-dir '.[dev]' && \
# Purge build-only dependencies
apt-get purge -y --auto-remove gcc python3-dev && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Expose the application port
EXPOSE 8080
# Command to run the application
CMD ["uv", "run", "uvicorn", "langconnect.server:APP", "--host", "0.0.0.0", "--port", "8080"]
================
File: INSTRUCTIONS.md
================
You're implementing a REST API for a RAG system. You are to use FastAPI and LangChain.
Below are instructions on all of the different endpoints you need to implement.
# API Endpoint Definitions
## Collections
Manage vector store collections.
- POST /collections
- Creates a new collection.
- Request Body: JSON containing collection details (e.g., {'name': 'my_collection'}).
- Response: Details of the created collection or confirmation.
- GET /collections
- Lists all available collections.
- Response: List of collection identifiers or objects.
- GET /collections/{collection_id}
- Retrieves details of a specific collection.
- Path Parameter: collection_id - The ID of the collection to retrieve.
- Response: Details of the specified collection.
- PUT /collections/{collection_id}
- Updates/replaces an existing collection (e.g., rename).
- Path Parameter: collection_id - The ID of the collection to update.
- Request Body: JSON containing the full updated collection details.
- Response: Details of the updated collection.
- PATCH /collections/{collection_id}
- Partially updates an existing collection.
- Path Parameter: collection_id - The ID of the collection to update.
- Request Body: JSON containing the specific fields to update.
- Response: Details of the updated collection.
- DELETE /collections/{collection_id}
- Deletes a specific collection.
- Path Parameter: collection_id - The ID of the collection to delete.
- Response: Confirmation of deletion.
## Documents (within Collections)
Manage documents within a specific collection (RAG functionality).
- POST /collections/{collection_id}/documents
- Indexes (adds) a new document to the specified collection.
- Path Parameter: collection_id - The ID of the collection to add the document to.
- Request Body: The document data to be indexed.
- Response: Identifier or details of the indexed document.
- GET /collections/{collection_id}/documents
- Lists all documents within a specific collection.
- Path Parameter: collection_id - The ID of the collection.
- Query Parameters (Optional):
- query={search_terms}: Filter documents based on search terms.
- limit={N}: Limit the number of results.
- offset={M}: Skip the first M results (for pagination).
- Response: List of document identifiers or objects within the collection.
- GET /collections/{collection_id}/documents/{document_id}
- Retrieves a specific document from a collection.
- Path Parameters:
- collection_id: The ID of the collection.
- document_id: The ID of the document to retrieve.
- Response: The content or details of the specified document.
- PUT /collections/{collection_id}/documents/{document_id}
- Updates/replaces an existing document in a collection.
- Path Parameters:
- collection_id: The ID of the collection.
- document_id: The ID of the document to update.
- Request Body: The full updated document data.
- Response: Details of the updated document.
- PATCH /collections/{collection_id}/documents/{document_id}
- Partially updates an existing document in a collection.
- Path Parameters:
- collection_id: The ID of the collection.
- document_id: The ID of the document to update.
- Request Body: JSON containing the specific fields/parts of the document to update.
- Response: Details of the updated document.
- DELETE /collections/{collection_id}/documents/{document_id}
- Deletes a specific document from a collection.
- Path Parameters:
- collection_id: The ID of the collection.
- document_id: The ID of the document to delete.
- Response: Confirmation of deletion.
- POST /collections/{collection_id}/documents/search (Alternative Search)
- Performs a search within a specific collection using potentially complex criteria.
- Use this if GET with query parameters is insufficient (e.g., requires a request body).
- Path Parameter: collection_id - The ID of the collection to search within.
- Request Body: JSON containing search criteria.
- Response: List of matching documents.
## LangChain Integration
Please setup this application with LangChain document loaders, text splitters and vector stores.
### Document Loaders
You should use the `UploadFile` type from FastAPI for the inputs to the API for uploading documents. Then, use the `Blob` class from `langchain_core.documents` to load the uploaded file as a blob.
Finally, use the `MimeTypeBasedParser` from `langchain_community.document_loaders.parsers.generic` to parse the blob into a document. Here is some example code, and the types of documents you should support:
```python
from langchain_community.document_loaders.parsers import BS4HTMLParser, PDFMinerParser
from langchain_community.document_loaders.parsers.generic import MimeTypeBasedParser
from langchain_community.document_loaders.parsers.msword import MsWordParser
from langchain_community.document_loaders.parsers.txt import TextParser
HANDLERS = {
"application/pdf": PDFMinerParser(),
"text/plain": TextParser(),
"text/html": BS4HTMLParser(),
"application/msword": MsWordParser(),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": (
MsWordParser()
),
}
SUPPORTED_MIMETYPES = sorted(HANDLERS.keys())
MIMETYPE_BASED_PARSER = MimeTypeBasedParser(
handlers=HANDLERS,
fallback_parser=None,
)
```
### Text Splitters
For text splitting, you should use the `RecursiveCharacterTextSplitter` from `langchain_text_splitters`. Set the following parameters:
`chunk_size=1000, chunk_overlap=200`.
### Vector Stores
For the vector store, use the PGVector LangChain integration. For connection details, use environment variables. Import from the `langchain_postgres` package.
You should also use postgres to create collections, and fetch/search/delete/create collections.
================
File: LICENSE
================
MIT License
Copyright (c) LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
================
File: Makefile
================
.PHONY: format lint lint-fix build up up-dev down logs restart clean help test
format:
ruff format .
ruff check --fix .
unsafe_fixes:
ruff check --fix --unsafe-fixes .
lint:
ruff check .
ruff format --diff
TEST_FILE ?= tests/unit_tests
test:
IS_TESTING=true uv run pytest $(TEST_FILE)
help:
@echo "Available commands:"
@echo " make format - Format code with ruff"
@echo " make lint - Check code with ruff"
@echo " make lint-fix - Fix linting issues with ruff"
@echo " make test - Run unit tests"
@echo " make build - Build Docker images"
@echo " make up - Start all services in detached mode"
@echo " make up-dev - Start all services with live reload"
@echo " make down - Stop all services"
@echo " make logs - View logs of all services"
@echo " make restart - Restart all services"
@echo " make clean - Remove containers, volumes, and images"
build:
docker-compose build
up:
docker-compose up -d
up-dev:
docker-compose up
down:
docker-compose down
logs:
docker-compose logs -f
restart:
docker-compose restart
clean:
docker-compose down -v
docker rmi langconnect-api:latest 2>/dev/null || true
================
File: pyproject.toml
================
[project]
name = "langconnect"
version = "0.0.1"
description = "LangConnect: A RAG service"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115.6",
"langchain>=0.3.20",
"langchain-openai>=0.3.7",
"langchain-community>=0.0.20",
"langchain-core>=0.2.37",
"langchain-text-splitters>=0.0.1",
"langchain-postgres>=0.0.2",
"langgraph-sdk>=0.1.48",
"python-dotenv>=1.0.1",
"uvicorn>=0.34.0",
"aiohttp>=3.11.13",
"python-multipart>=0.0.20",
"httpx>=0.28.1",
"beautifulsoup4>=4.12.3",
"pdfminer.six>=20231228",
"asyncpg>=0.30.0",
"psycopg[binary]>=3.2.6",
"pillow>=11.2.1",
"pdfminer.six>=20250416",
"lxml>=5.4.0",
"unstructured>=0.17.2",
"supabase>=2.15.1",
]
[project.packages]
find = { where = ["langconnect"] }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["langconnect"]
[dependency-groups]
dev = [
"ruff>=0.8.4",
"langgraph-api>=0.0.28",
"langgraph-cli>=0.1.75",
"pytest-socket>=0.7.0",
"pytest-timeout>=2.4.0",
"pytest-asyncio>=0.26.0",
]
[tool.pytest.ini_options]
minversion = "8.0"
# -ra: Report all extra test outcomes (passed, skipped, failed, etc.)
# -q: Enable quiet mode for less cluttered output
# -v: Enable verbose output to display detailed test names and statuses
# --durations=5: Show the 10 slowest tests after the run (useful for performance tuning)
addopts = "-ra -q -v --durations=5"
testpaths = [
"tests",
]
python_files = ["test_*.py"]
python_functions = ["test_*"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
[tool.ruff]
line-length = 88
target-version = "py311"
[tool.ruff.lint]
select = [
"ALL",
]
ignore = [
"COM812",
"ANN001",
"ANN201",
"ARG001",
"B008",
"B904",
"BLE001",
"C901",
"D100",
"D101",
"D104",
"D106",
"D205",
"E501",
"EM101",
"EM102",
"ERA001",
"FAST002",
"G004",
"G201",
"PLR0912",
"PLR0915",
"PLW0603",
"RET504",
"RUF006",
"S104",
"T201",
"TC002",
"TID252",
"TD003",
"FIX002",
"TRY003",
"TRY004",
"TRY201",
"TRY300",
"TRY301",
"TRY401",
"UP007",
"W291"
]
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = [
"S101", # bare asserts
"ARG", # unused-argument
"FBT", # boolean-tuple-for-parameter
"D104", # missing docstring in package
"PLR2004", # magic-values-in-comparison
"S311", # use of non-crypto RNG
]
[tool.ruff.lint.pydocstyle]
convention = "google"
================
File: README.md
================
# LangConnect
LangConnect is a RAG (Retrieval-Augmented Generation) service built with FastAPI and LangChain. It provides a REST API for managing collections and documents, with PostgreSQL and pgvector for vector storage.
## Features
- FastAPI-based REST API
- PostgreSQL with pgvector for document storage and vector embeddings
- Docker support for easy deployment
## Getting Started
### Prerequisites
- Docker and Docker Compose
- Python 3.11 or higher
### Running with Docker
1. Clone the repository:
```bash
git clone https://github.com/langchain-ai/langconnect.git
cd langconnect
```
2. Start the services:
```bash
docker-compose up -d
```
This will:
- Start a PostgreSQL database with pgvector extension
- Build and start the LangConnect API service
3. Access the API:
- API documentation: http://localhost:8080/docs
- Health check: http://localhost:8080/health
### Development
To run the services in development mode with live reload:
```bash
docker-compose up
```
## API Documentation
The API documentation is available at http://localhost:8080/docs when the service is running.
## Environment Variables
The following environment variables can be configured in the `docker-compose.yml` file:
| Variable | Description | Default |
|----------|-------------|---------|
| POSTGRES_HOST | PostgreSQL host | postgres |
| POSTGRES_PORT | PostgreSQL port | 5432 |
| POSTGRES_USER | PostgreSQL username | postgres |
| POSTGRES_PASSWORD | PostgreSQL password | postgres |
| POSTGRES_DB | PostgreSQL database name | postgres |
## License
This project is licensed under the terms of the license included in the repository.
## Endpoints
### Collections
#### `/collections` (GET)
List all collections.
#### `/collections` (POST)
Create a new collection.
#### `/collections/{collection_id}` (GET)
Get a specific collection by ID.
#### `/collections/{collection_id}` (DELETE)
Delete a specific collection by ID.
### Documents
#### `/collections/{collection_id}/documents` (GET)
List all documents in a specific collection.
#### `/collections/{collection_id}/documents` (POST)
Create a new document in a specific collection.
#### `/collections/{collection_id}/documents/{document_id}` (DELETE)
Delete a specific document by ID.
#### `/collections/{collection_id}/documents/search` (POST)
Search for documents using semantic search.
================================================================
End of Codebase
================================================================