Build super simple end-to-end data & ETL pipelines for your vector databases and Generative AI applications
MIT License
VectorETL by Context Data is a modular framework designed to help Data & AI engineers process data for their AI applications in just a few minutes!
VectorETL streamlines the process of converting diverse data sources into vector embeddings and storing them in various vector databases. It supports multiple data sources (databases, cloud storage, and local files), various embedding models (including OpenAI, Cohere, and Google Gemini), and several vector database targets (like Pinecone, Qdrant, and Weaviate).
This pipeline aims to simplify the creation and management of vector search systems, enabling developers and data scientists to easily build and scale applications that require semantic search, recommendation systems, or other vector-based operations.
pip install --upgrade vector-etl
or
pip install git+https://github.com/ContextData/VectorETL.git
This section provides instructions on how to use the ETL framework for Vector Databases. We'll cover running, validating configurations, and provide some common usage examples.
Assuming you have a configuration file similar to the file below.
source:
source_data_type: "database"
db_type: "postgres"
host: "localhost"
database_name: "customer_data"
username: "user"
password: "password"
port: 5432
query: "SELECT * FROM customers WHERE updated_at > :last_updated_at"
batch_size: 1000
chunk_size: 1000
chunk_overlap: 0
embedding:
embedding_model: "OpenAI"
api_key: ${OPENAI_API_KEY}
model_name: "text-embedding-ada-002"
target:
target_database: "Pinecone"
pinecone_api_key: ${PINECONE_API_KEY}
index_name: "customer-embeddings"
dimension: 1536
metric: "cosine"
embed_columns:
- "customer_name"
- "customer_description"
- "purchase_history"
You can then import the configuration into your python project and automatically run it from there
from vector_etl import create_flow
flow = create_flow()
flow.load_yaml('/path/to/your/config.yaml')
flow.execute()
Using the same yaml configuration file from Option 2 above, you can run the process directly from your command line without having to import it into a python application.
To run the ETL framework, use the following command:
vector-etl -c /path/to/your/config.yaml
from vector_etl import create_flow
source = {
"source_data_type": "database",
"db_type": "postgres",
"host": "localhost",
"port": "5432",
"database_name": "test",
"username": "user",
"password": "password",
"query": "select * from test",
"batch_size": 1000,
"chunk_size": 1000,
"chunk_overlap": 0,
}
embedding = {
"embedding_model": "OpenAI",
"api_key": ${OPENAI_API_KEY},
"model_name": "text-embedding-ada-002"
}
target = {
"target_database": "Pinecone",
"pinecone_api_key": ${PINECONE_API_KEY},
"index_name": "my-pinecone-index",
"dimension": 1536
}
embed_columns = ["customer_name", "customer_description", "purchase_history"]
flow = create_flow()
flow.set_source(source)
flow.set_embedding(embedding)
flow.set_target(target)
flow.set_embed_columns(embed_columns)
# Execute the flow
flow.execute()
Here are some examples of how to use the ETL framework for different scenarios:
vector-etl -c config/postgres_to_pinecone.yaml
Where postgres_to_pinecone.yaml
might look like:
source:
source_data_type: "database"
db_type: "postgres"
host: "localhost"
database_name: "customer_data"
username: "user"
password: "password"
port: 5432
query: "SELECT * FROM customers WHERE updated_at > :last_updated_at"
batch_size: 1000
chunk_size: 1000
chunk_overlap: 0
embedding:
embedding_model: "OpenAI"
api_key: ${OPENAI_API_KEY}
model_name: "text-embedding-ada-002"
target:
target_database: "Pinecone"
pinecone_api_key: ${PINECONE_API_KEY}
index_name: "customer-embeddings"
dimension: 1536
metric: "cosine"
embed_columns:
- "customer_name"
- "customer_description"
- "purchase_history"
vector-etl -c config/s3_to_qdrant.yaml
Where s3_to_qdrant.yaml
might look like:
source:
source_data_type: "Amazon S3"
bucket_name: "my-data-bucket"
prefix: "customer_data/"
file_type: "csv"
aws_access_key_id: ${AWS_ACCESS_KEY_ID}
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY}
chunk_size: 1000
chunk_overlap: 200
embedding:
embedding_model: "Cohere"
api_key: ${COHERE_API_KEY}
model_name: "embed-english-v2.0"
target:
target_database: "Qdrant"
qdrant_url: "https://your-qdrant-cluster-url.qdrant.io"
qdrant_api_key: ${QDRANT_API_KEY}
collection_name: "customer_embeddings"
embed_columns: []
The VectorETL (Extract, Transform, Load) framework is a powerful and flexible tool designed to streamline the process of extracting data from various sources, transforming it into vector embeddings, and loading these embeddings into a range of vector databases.
It's built with modularity, scalability, and ease of use in mind, making it an ideal solution for organizations looking to leverage the power of vector search in their data infrastructure.
Versatile Data Extraction: The framework supports a wide array of data sources, including traditional databases, cloud storage solutions (like Amazon S3 and Google Cloud Storage), and popular SaaS platforms (such as Stripe and Zendesk). This versatility allows you to consolidate data from multiple sources into a unified vector database.
Advanced Text Processing: For textual data, the framework implements sophisticated chunking and overlapping techniques. This ensures that the semantic context of the text is preserved when creating vector embeddings, leading to more accurate search results.
State-of-the-Art Embedding Models: The system integrates with leading embedding models, including OpenAI, Cohere, Google Gemini, and Azure OpenAI. This allows you to choose the embedding model that best fits your specific use case and quality requirements.
Multiple Vector Database Support: Whether you're using Pinecone, Qdrant, Weaviate, SingleStore, Supabase, or LanceDB, this framework has you covered. It's designed to seamlessly interface with these popular vector databases, allowing you to choose the one that best suits your needs.
Configurable and Extensible: The entire framework is highly configurable through YAML or JSON configuration files. Moreover, its modular architecture makes it easy to extend with new data sources, embedding models, or vector databases as your needs evolve.
This ETL framework is ideal for organizations looking to implement or upgrade their vector search capabilities.
By automating the process of extracting data, creating vector embeddings, and storing them in a vector database, this framework significantly reduces the time and complexity involved in setting up a vector search system. It allows data scientists and engineers to focus on deriving insights and building applications, rather than worrying about the intricacies of data processing and vector storage.
The ETL framework uses a configuration file to specify the details of the source, embedding model, target database, and other parameters. You can use either YAML or JSON format for the configuration file.
The configuration file is divided into three main sections:
source
: Specifies the data source detailsembedding
: Defines the embedding model to be usedtarget
: Outlines the target vector databaseembed_columns
: Defines the columns that need to be embedded (mainly for structured data sources)from vector_etl import create_flow
source = {
"source_data_type": "database",
"db_type": "postgres",
"host": "localhost",
"port": "5432",
"database_name": "test",
"username": "user",
"password": "password",
"query": "select * from test",
"batch_size": 1000,
"chunk_size": 1000,
"chunk_overlap": 0,
}
embedding = {
"embedding_model": "OpenAI",
"api_key": ${OPENAI_API_KEY},
"model_name": "text-embedding-ada-002"
}
target = {
"target_database": "Pinecone",
"pinecone_api_key": ${PINECONE_API_KEY},
"index_name": "my-pinecone-index",
"dimension": 1536
}
embed_columns = ["customer_name", "customer_description", "purchase_history"]
source:
source_data_type: "database"
db_type: "postgres"
host: "localhost"
database_name: "mydb"
username: "user"
password: "password"
port: 5432
query: "SELECT * FROM mytable WHERE updated_at > :last_updated_at"
batch_size: 1000
chunk_size: 1000
chunk_overlap: 0
embedding:
embedding_model: "OpenAI"
api_key: "your-openai-api-key"
model_name: "text-embedding-ada-002"
target:
target_database: "Pinecone"
pinecone_api_key: "your-pinecone-api-key"
index_name: "my-index"
dimension: 1536
metric: "cosine"
cloud: "aws"
region: "us-west-2"
embed_columns:
- "column1"
- "column2"
- "column3"
{
"source": {
"source_data_type": "database",
"db_type": "postgres",
"host": "localhost",
"database_name": "mydb",
"username": "user",
"password": "password",
"port": 5432,
"query": "SELECT * FROM mytable WHERE updated_at > :last_updated_at",
"batch_size": 1000,
"chunk_size": 1000,
"chunk_overlap": 0
},
"embedding": {
"embedding_model": "OpenAI",
"api_key": "your-openai-api-key",
"model_name": "text-embedding-ada-002"
},
"target": {
"target_database": "Pinecone",
"pinecone_api_key": "your-pinecone-api-key",
"index_name": "my-index",
"dimension": 1536,
"metric": "cosine",
"cloud": "aws",
"region": "us-west-2"
},
"embed_columns": ["column1", "column2", "column3"]
}
The source
section varies based on the source_data_type
. Here are examples for different source types:
{
"source_data_type": "database",
"db_type": "postgres", # or "mysql", "snowflake", "salesforce"
"host": "localhost",
"database_name": "mydb",
"username": "user",
"password": "password",
"port": 5432,
"query": "SELECT * FROM mytable WHERE updated_at > :last_updated_at",
"batch_size": 1000,
"chunk_size": 1000,
"chunk_overlap": 0
}
source:
source_data_type: "database"
db_type: "postgres" # or "mysql", "snowflake", "salesforce"
host: "localhost"
database_name: "mydb"
username: "user"
password: "password"
port: 5432
query: "SELECT * FROM mytable WHERE updated_at > :last_updated_at"
batch_size: 1000
chunk_size: 1000
chunk_overlap: 0
{
"source_data_type": "Amazon S3",
"bucket_name": "my-bucket",
"key": "path/to/files/",
"file_type": ".csv",
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-key"
}
source:
source_data_type: "Amazon S3"
bucket_name: "my-bucket"
key: "path/to/files/"
file_type: ".csv"
aws_access_key_id: "your-access-key"
aws_secret_access_key: "your-secret-key"
{
"source_data_type": "Google Cloud Storage",
"credentials_path": "/path/to/your/credentials.json",
"bucket_name": "myBucket",
"prefix": "prefix/",
"file_type": "csv",
"chunk_size": 1000,
"chunk_overlap": 0
}
source:
source_data_type: "Google Cloud Storage"
credentials_path: "/path/to/your/credentials.json"
bucket_name: "myBucket"
prefix: "prefix/"
file_type: "csv"
chunk_size: 1000
chunk_overlap: 0
Starting from version 0.1.6.3, users can now utilize the Unstructured's Serverless API to efficiently extract data from a multitude of file based sources.
NOTE: This is limited to the Unstructured Severless API and should not be used for the Unstructured's open source framework
This is limited to [PDF, DOCX, DOC, TXT] files
In order to use Unstructured, you will need three additional parameters
use_unstructured
: (True/False) indicator telling the framework to use the Unstructured APIunstructured_api_key
: Enter your Unstructured API Keyunstructured_url
: Enter your API Url from your Unstructured dashboard# Example using Local file
source:
source_data_type: "Local File"
file_path: "/path/to/file.docx"
file_type: "docx"
use_unstructured: True
unstructured_api_key: 'my-unstructured-key'
unstructured_url: 'https://my-domain.api.unstructuredapp.io'
# Example using Amazon S3
source:
source_data_type: "Amazon S3"
bucket_name: "myBucket"
prefix: "Dir/Subdir/"
file_type: "pdf"
aws_access_key_id: "your-access-key"
aws_secret_access_key: "your-secret-access-key"
use_unstructured: True
unstructured_api_key: 'my-unstructured-key'
unstructured_url: 'https://my-domain.api.unstructuredapp.io'
The embedding
section specifies which embedding model to use:
embedding:
embedding_model: "OpenAI" # or "Cohere", "Google Gemini", "Azure OpenAI", "Hugging Face"
api_key: "your-api-key"
model_name: "text-embedding-ada-002" # model name varies by provider
The target
section varies based on the chosen vector database. Here's an example for Pinecone:
target:
target_database: "Pinecone"
pinecone_api_key: "your-pinecone-api-key"
index_name: "my-index"
dimension: 1536
metric: "cosine"
cloud: "aws"
region: "us-west-2"
The embed_columns
list specifies which columns from the source data should be used to generate the embeddings (only applies to database sources for now):
embed_columns:
- "column1"
- "column2"
- "column3"
The embed_columns
list is only required for structured data sources (e.g. PostgreSQL, MySQL, Snowflake). For all other sources, use an empty list
embed_columns: []
To protect sensitive information like API keys and passwords, consider using environment variables or a secure secrets management system. You can then reference these in your configuration file:
embedding:
api_key: ${OPENAI_API_KEY}
This allows you to keep your configuration files in version control without exposing sensitive data.
Remember to adjust your configuration based on your specific data sources, embedding models, and target databases. Refer to the documentation for each service to ensure you're providing all required parameters.
We welcome contributions to the ETL Framework for Vector Databases! Whether you're fixing bugs, improving documentation, or proposing new features, your efforts are appreciated. Here's how you can contribute:
If you encounter a bug or have a suggestion for improving the ETL framework:
We're always looking for ways to make the ETL framework better. If you have ideas:
We actively welcome your pull requests:
main
.To maintain consistency throughout the project, please adhere to these coding standards:
Improving documentation is always appreciated:
If you're thinking about adding a new feature:
source_mods
directory.get_source_class
function in source_mods/__init__.py
.embedding_mods
directory.get_embedding_model
function in embedding_mods/__init__.py
.target_mods
directory.get_target_database
function in target_mods/__init__.py
.We encourage all users to join our Discord server to collaborate with the Context Data development team and other contributors in order to suggest upgrades, new integrations and issues.