dlt.destinations.impl.weaviate.weaviate_client
LoadWeaviateJob Objects
class LoadWeaviateJob(RunnableLoadJob)
load_batch
@wrap_weaviate_error
def load_batch(f: IO[str]) -> None
Load all the lines from stream f using Weaviate batch.
Weaviate batch supports retries so we do not need to do that.
WeaviateClient Objects
class WeaviateClient(JobClientBase, WithStateSync)
Weaviate client implementation using v4 API.
create_db_client
@staticmethod
def create_db_client(
config: WeaviateClientConfiguration) -> weaviate.WeaviateClient
Create a Weaviate client using v4 API.
Supports three connection types:
- "cloud": For Weaviate Cloud Services (uses connect_to_weaviate_cloud)
- "local": For local Docker instances (uses connect_to_local)
- "custom": For self-hosted instances (uses connect_to_custom)
If connection_type is not specified, it's auto-detected from the URL:
- URLs containing ".weaviate.cloud" -> "cloud"
- URLs with "localhost" or "127.0.0.1" -> "local"
- Other URLs -> "custom" (requires http_port and grpc_port)
make_qualified_collection_name
def make_qualified_collection_name(table_name: str) -> str
Make a full Weaviate collection name from a table name by prepending the dataset name if it exists.
get_collection_schema
def get_collection_schema(table_name: str) -> Dict[str, Any]
Get the Weaviate collection schema for a table.
create_collection
def create_collection(collection_config: Dict[str, Any],
full_collection_name: Optional[str] = None) -> None
Create a Weaviate collection.
Arguments:
collection_config- The collection configuration to create.full_collection_name- The full name of the collection to create. If not provided, the collection name will be prepended with the dataset name if it exists.
add_property_to_collection
def add_property_to_collection(collection_name: str,
prop_schema: Dict[str, Any]) -> None
Add a property to an existing Weaviate collection.
Arguments:
collection_name- The name of the collection to add the property to.prop_schema- The property schema to add.
delete_collection
def delete_collection(collection_name: str) -> None
Delete a Weaviate collection.
Arguments:
collection_name- The name of the collection to delete.
delete_all_collections
def delete_all_collections() -> None
Delete all Weaviate collections from Weaviate instance and all data associated with it.
query_collection
def query_collection(collection_name: str,
properties: List[str],
limit: int = 100) -> List[Dict[str, Any]]
Query a Weaviate collection.
Arguments:
collection_name- The name of the collection to query.properties- The properties to return.limit- Maximum number of results to return.
Returns:
A list of objects from the collection.
create_object
def create_object(obj: Dict[str, Any], collection_name: str) -> None
Create a Weaviate object.
Arguments:
obj- The object to create.collection_name- The name of the collection to create the object in.
drop_storage
def drop_storage() -> None
Drop the dataset from Weaviate instance.
Deletes all collections in the dataset and all data associated with them. Deletes the sentinel collection as well.
If dataset name was not provided, it deletes all the tables in the current schema
get_stored_state
def get_stored_state(pipeline_name: str) -> Optional[StateInfo]
Loads compressed state from destination storage
get_stored_schema
def get_stored_schema(schema_name: str = None) -> Optional[StorageSchemaInfo]
Retrieves newest schema from destination storage
get_records
@wrap_weaviate_error
def get_records(table_name: str,
filter_field: str = None,
filter_value: Any = None,
sort_by: str = None,
sort_order: str = "asc",
limit: int = 0,
offset: int = 0,
properties: List[str] = None) -> List[Dict[str, Any]]
Get records from a collection using v4 API
make_weaviate_collection_schema
def make_weaviate_collection_schema(table_name: str) -> Dict[str, Any]
Creates a Weaviate collection schema from a table schema.
sentinel_class
@property
def sentinel_class() -> str
Alias for sentinel_collection for backwards compatibility
make_qualified_class_name
def make_qualified_class_name(table_name: str) -> str
Alias for make_qualified_collection_name for backwards compatibility
get_class_schema
def get_class_schema(table_name: str) -> Dict[str, Any]
Alias for get_collection_schema for backwards compatibility
create_class
def create_class(class_schema: Dict[str, Any],
full_class_name: Optional[str] = None) -> None
Alias for create_collection for backwards compatibility
create_class_property
def create_class_property(class_name: str, prop_schema: Dict[str,
Any]) -> None
Alias for add_property_to_collection for backwards compatibility
delete_class
def delete_class(class_name: str) -> None
Alias for delete_collection for backwards compatibility
query_class
def query_class(class_name: str, properties: List[str]) -> Any
Query a collection and return results in v3-compatible format for backwards compatibility