Distributed Data Warehousing with Bacalhau
Introduction
Expanso helps leading organizations innovate quickly by helping them architect, implement, and maintain enterprise-grade distributed compute platforms. In fact, Expanso is the same team that created and currently maintains the industry-leading open-source software Bacalhau. It is built to improve cost and efficiency of processing large-scale datasets in distributed environments.
Challenges in traditional data warehousing.
SCALABILITY
As data volume and user demands increase, centralized systems become costly and complex to scale.
PERFORMANCE
More users and applications accessing the system can degrade performance, leading to delays in data retrieval and analysis.
ENGINEERING COSTS
Centralizing data, especially from diverse sources, demands substantial engineering efforts.
MAINTENANCE
Routine updates and upkeep are resource-intensive, requiring significant investments in hardware, software, and skilled personnel.
SLOW TO ADAPT
Centralized systems are often slow and complex to adapt to new data sources or schema changes.
Given these issues, the emerging Data Mesh approach could offer a more effective solution.
Solution: Distributed data warehousing
When teams view data as a product and adopt a federated governance model, querying data directly at its source can eliminate the need for expensive ETL processes. It’s often more efficient to send computation to the data rather than moving the data to the computation, and this holds true even outside of a data mesh architecture.
By placing compute nodes near the data sources, computational tasks or queries can be sent directly to the data. This approach allows for results to be delivered efficiently through the chosen storage solution. Typically, the resulting computations are smaller and easier to transmit than entire datasets, facilitating near real-time processing. Meanwhile, backing up essential log data continues to be handled conventionally.
Implementing a distributed data warehouse offers clear benefits:
Reduced Data Movement: Directly querying data at its source significantly reduces the need to move large amounts of data to a central location. This is cutting costs and easing the workload on data engineers. This setup allows users to extract concise results from their queries while continuing to archive daily data for audit trails.
Easier Scaling: A distributed data warehouse can scale effectively by adding more compute nodes where data is generated. If you double the number of data sources, you don't need to more than double your central warehouse's capacity; simply enhance the infrastructure at the data collection points.
Real-Time Querying: Users can immediately run queries on the most recent data without waiting for daily transfers and processing. This enables decision-making based on the latest business information.
Avoid Vendor Lock-in: Relying on a single provider for a centralized data warehouse may not be the best long-term strategy. Over time, a provider's limited capabilities or changes in service can diminish effectiveness. Additionally, the sunk costs associated with one provider can make switching challenging, even when it's necessary.
Bacalhau as a distributed data warehouse orchestrator.
Bacalhau lets you leverage a distributed data warehouse really easily, without having to overhaul your current systems. Most companies already have the data and computing power they need for analytics, scattered across different databases, servers, and edge locations.
Bacalhau makes it simple to turn your scattered resources into a dynamic data warehouse. Just install its lightweight agents right where your data lives, and you can run compute jobs on-site. This way, you avoid moving big datasets around or making big changes to your ETL processes and data models.
FLEXIBLE COMPUTE NODES
Bacalhau’s compatibility with Docker and WebAssembly enables efficient handling of diverse workloads. Its compute nodes support a broad range of applications, from modern .NET applications to legacy IBM AS/400 systems, allowing for custom execution engines and versatile use cases.
DIVERSE DATA ACCESS
Bacalhau cuts the high costs associated with data transfer by enabling direct access to local data, reducing the need for extensive data movement. It supports a variety of storage options including S3-compatible storage, IPFS, and direct local storage, ensuring data is readily available for computation.
SMART JOB ALLOCATION
Bacalhau allows you to precisely manage how jobs are assigned to compute nodes. You can target nodes for specific jobs using labels that denote their characteristics, and the latest updates to the platform enable even more nuanced selections based on these labels.
Implementation example
Retail Industry
SCENARIO
Consider a retail chain with multiple stores spread across different regions. Each store has its own POS system collecting sales data. Traditionally, this data is batch-uploaded to a central data warehouse at the end of each day for processing and report generation.
BACALHAU DEPLOYMENT
With Bacalhau, each store installs a compute node that processes data locally. A control plane node orchestrates tasks across the compute nodes, distributing work based on various selection criteria. Retailers can now query data in near real-time, gaining instant insights without the need for extensive data transfer or central processing infrastructure.
BENEFITS
Real-time Insights: Retailers can query data and generate reports in near real-time, enabling quicker decision-making.
Reduced Network Bandwidth: By processing data locally, Bacalhau significantly reduces the amount of data that needs to be transferred over the network.
Enhanced Security: Minimizing data transfer also reduces the risk of data incursions, ensuring better compliance with data governance regulations.
Cost-Effectiveness: Bacalhau eliminates the need for expensive central processing infrastructure, making it a cost-effective solution.
Code Implementation
Step 0 - Prerequisites
Before you start, you’ll need:
- Storage Solution: Have a storage provider or location ready for the job results.
- Firewall Configuration: Adjust your firewall settings so your node can talk to the rest of the Bacalhau network.
- Hosting Setup: Prepare a physical server, a virtual machine, or a cloud-based instance. Note that Bacalhau compute nodes should not be run inside a Docker container.
- Bacalhau CLI: Install the Bacalhau CLI on your local machine, following the instructions provided
Step 1 - Provisioning hardware
- Control Plane Node: This is your operation’s headquarters, coordinating tasks throughout your network.
Recommended Specs:
- Instances: 1
- Disk Space: 25-100GB
- CPU: 1-8 cores (vCPU)
- RAM: 4-16GB
- Compute Node(s): These workhorses run your code and access data that’s local or close to it.
Recommended Specs:
- Instances: 1-N (We’ll use 4 in this example, symbolizing 4 different locations)
- Disk Space: 32-500GB
- CPU: 1-8 cores (vCPU)
- RAM: 4-16GB
Note: It’s crucial that the Control Plane Node can communicate with the Compute Nodes. For guidance on this, you can follow this tutorial.
Step 2 - Installing the compute and requestor node
Bacalhau uses a node called a requester node to orchestrate jobs in the network, communicating with the compute nodes to distribute work according to the various selection criteria. Once you have installed Bacalhau, you can run the requester node as follows.
bacalhau serve \
--node-type requester \
--private-internal-ipfs=false
- Set Environment Variables: Record the environment variables from the requester node and apply them to each machine designated as a compute node.
- Install Compute Nodes: At each store location with data, you’ll need to install a Bacalhau compute node on a machine with data access. Use the standard Bacalhau installation instructions, but for a private network, you’ll specify a unique –peer value.
- Job Distribution: To distribute jobs to the compute nodes, you have two main strategies:
- Job Selection Policies: Implement custom logic to decide if a node should run a job. More details can be found in the job selection policy documentation.
- Node Labels: For a simpler approach, use node labels to target specific nodes or groups of nodes. This is the recommended method for this guide.
# We added a path to allow-listed-local-paths where we store our data, and
# a set of labels to allow us to target specific nodes
bacalhau serve \
--node-type compute \
--ipfs-connect $IPFS_CONNECT \
--private-internal-ipfs=false \
--labels "storeid=1,region=EU,country=FR,city=Paris"
--allow-listed-local-paths "/node/data"
--peer env
Note the addition of labels, which allow us to target specific nodes when we run our jobs. Here we add a store identifier, a region, a country and a city so that we can target queries in our warehouse to any of these labels. In reality, we may add more metadata here to provide even more flexibility in precisely targeting stores by one or more of these labels.
For this example, we’ll use nodes with the following labels:
Node (ID) | Region | Country | City |
---|---|---|---|
1 | EU | FR | Australia |
2 | NorthAmerica | CA | Australia |
3 | EU | DE | United States of America |
4 | NorthAmerica | US | Canada |
By strategically applying labels, you can direct jobs to:
- All stores, regardless of location.
- A single store within a specific region.
- All stores within a particular region.
- A single store within a given country.
- All stores across a specific country.
- A particular set of stores identified by their IDs.
Step 3 - Running a query across the network
Before we can query our data, we need to know what shape it has, so we want to run a query against the transaction data on one of the compute nodes (it doesn’t matter which). As we know that each compute node has access to transaction data at /node/data/transactions.csv we can query for that using DuckDB. If the data was made available in a form that DuckDB does not understand, we can use any other tool that works with docker, or webassembly, or even implement our own pluggable executor to support specific use-cases.
We’ll need to set an environment variable to point to our Bacalhau cluster, in this case by specifying BACALHAU_CLIENT_API_HOST as this will remove the need to provide a command line flag to the Bacalhau program on each invocation. As each command we run will also need to access the transactions database, we’ll also store that in an environment variable to reduce the amount of typing necessary.
export BACALHAU_API_HOST="34.155.152.133"
export TRXN_DATA_INPUT="src=file:/node/data/transactions.csv,dst=/inputs/trxn.csv"
To find the shape of our transaction data, we can run the following command to query the database and print the results to the terminal.
bacalhau \
docker run \
-f -i $TRXN_DATA_INPUT \
expanso/duckdb-ddw:0.0.1 \
"DESCRIBE TABLE '/inputs/trxn.csv';"
Here docker run is telling Bacalhau to run a docker container, -f tells it to log output to the terminal, -i sets up the input data, expanso/duckdb-ddw:0.0.1 is the docker container to run, and the final section is the query we want to run against the data. In this case, after a short delay, we should see the following output.
Output
column_name,column_type,null,key,default,extra
Invoice,VARCHAR,YES,,,
StockCode,VARCHAR,YES,,,
Description,VARCHAR,YES,,,
Quantity,BIGINT,YES,,,
InvoiceDate,VARCHAR,YES,,,
Price,DOUBLE,YES,,,
"Customer ID",BIGINT,YES,,,
Country,VARCHAR,YES,,,
Step 4 - Getting the results
So far, we’ve only run queries that show output to the terminal using the -f flag. In practice, we’ll be running queries with more output, and potentially across multiple nodes. In this case we’ll want to publish the results, so that anything the compute task writes to the /outputs folder is made available to you in the terminal as a file (or files). To do this, we use the -p flag to specify a publisher.
As we want to store our output in S3 (or any S3-compatible storage), we have made sure that each of the compute nodes has credentials that allow it to connect to S3. Details on these credential requirements are available in the Bacalhau documentation. In our case, we want to store the output in an S3 bucket called “bacalhau-usecase-distributed-data-warehouse”. To avoid having to type this for each command, we’ll store the full publisher URL in an environment variable, showing we want to also include the job id, and the execution id in the output’s prefix.
export PUBLISHER=s3://bacalhau-usecase-distributed-data-warehouse/{jobID}/{executionID}
We can now use specify -p $PUBLISHER in our docker run commands to have the output written to that location.
Step 5 - Working with the data
Now that we’re all set up, we can query our data. For instance, we can use the selector flags (-s) to target specific nodes. For instance, to find the total of all transactions in the Paris store, we can run:
bacalhau \
docker run \
-s city=Paris \
-f -i $TRXN_DATA_INPUT \
expanso/duckdb-ddw:0.0.1 \
"SELECT ROUND(SUM(Price),2) as Total FROM '/inputs/trxn.csv';"
This displays the output below:
Total
1620674.31
At this point, we might want to get more data, perhaps a list of all the countries who buy products from our European stores. This time, we want the output to be stored in S3, and so we also specify -p $PUBLISHER so that if we write to /outputs then the data will be put into our bucket.
We now need to write out data to a specific location, and so we will do that with the following command. Note that we need to specify –target=all as we expect it to run on more than one compute node. Without this it will pick only a single node in that region.
bacalhau \
docker run \
-s region=EU \
--target=all \
-p $PUBLISHER \
-i $TRXN_DATA_INPUT \
expanso/duckdb-ddw:0.0.1 \
"COPY
(SELECT DISTINCT(Country) as Country FROM '/inputs/trxn.csv' ORDER BY(Country))
TO '/outputs/results.csv' (HEADER, DELIMITER ',');"
This time, we see different output as Bacalhau shows us a job ID (in this case, 073ab816-9b9e-4dfa-9e90-6c4498aa1de6) and then shows progress as the job is happening. Once complete it tells us how we can get the details of the job, but running bacalhau describe 073ab816-9b9e-4dfa-9e90-6c4498aa1de6. Doing this shows lots of output, but the following cut-down snippet shows information on where the query was run, and where the outputs are stored.
State:
CreateTime: "2023-10-17T12:28:03.88046717Z"
Executions:
- ComputeReference: e-7c942a16-420d-4736-809c-1d6676e13a1c
CreateTime: "2023-10-17T12:28:03.902519479Z"
JobID: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6
NodeId: QmfKmkipkbAQu3ddChL4sLdjjcqifWQzURCin2QKUzovex
PublishedResults:
S3:
Bucket: bacalhau-usecase-distributed-data-warehouse Key:073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7c942a16-420d-4736-809c-1d6676e13a1c/
StorageSource: s3
...
State: Completed
UpdateTime: "2023-10-17T12:28:07.510247793Z"
Version: 3
- ComputeReference: e-7e346e49-d659-4188-ae04-cf5c28fd963b
CreateTime: "2023-10-17T12:28:03.907178486Z"
JobID: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6
NodeId: QmeD1rESiDtdVTDgekXAmDDqgN9ZdUHGGuMAC77krBGqSv
PublishedResults:
S3:
Bucket: bacalhau-usecase-distributed-data-warehouse
Key:073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7e346e49-d659-4188-ae04-cf5c28fd963b/
StorageSource: s3
...
State: Completed
UpdateTime: "2023-10-17T12:28:07.890888772Z"
Version: 3
Here we can see the two executions performed on EU nodes, with the bucket and key containing the outputs from our execution. Using the standard Bacalhau structure for outputs, we know that we will find CSV files in our bucket at 073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7c942a16-420d-4736-809c-1d6676e13a1c/outputs/results.csv and s3://bacalhau-usecase-distributed-data-warehouse/073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7e346e49-d659-4188-ae04-cf5c28fd963b/outputs/results.csv. To access this data requires that the user have AWS credentials, a tool to download the data, and a way to merge all of the results into one. Rather than burden the user with this work, we can wrap our command line invocations with something less complex.
Step 6 - Simplifying the interface
The previous sections of this tutorial have shown how to use, and specify, various Bacalhau features using the Bacalhau command line interface (CLI). While the interface is flexible and allows you to configure work in any way you wish, it does involve a lot of typing that might be overwhelming in an interactive scenario such as this.
Fortunately, Bacalhau provides an API, used by the command line interface, which means anything you can do in the CLI, you can do via its API. This provides even more flexibility in presentation, making it possible to build specialized interfaces for different use-cases. As an example of how you can use the Python SDK to build a specialized interface, you can take a look at the Distributed Data Warehouse Client which allows you to store commonly keyed information in a configuration file. This program of <200 lines of code let’s us move away from querying all regional stores like this rather lengthy command.
$ bacalhau \
docker run \
-s region=EU \
--target=all \
-p $PUBLISHER \
-i $TRXN_DATA_INPUT \
expanso/duckdb-ddw:0.0.1 \
"COPY
(SELECT DISTINCT(Country) as Country FROM '/inputs/trxn.csv' ORDER BY(Country))
TO '/outputs/results.csv' (HEADER, DELIMITER ',');"
We are able to move to querying like the following and get merged results written locally, ready for opening in a spreadsheet or further processing. So quite neat and way simpler.
$ poetry run ddw -a -s region=EU "SELECT DISTINCT(Country) as Country FROM '/inputs/transactions.csv' ORDER
BY(Country)"
Submitted job: d802752f-e0b1-417a-8b98-55381ce4f7fb
Output written to: output-d802752f.csv
Note: Only the first line is the input by us, the rest is the response from the system itself.
After checking you have the dependencies described in the repository you can install this client to try it out with
git clone
cd examples/distributed-datawarehouse
poetry install
Whilst this vastly reduces the complexity of the interface, not to mention the amount of typing, it is really just a starting point, beyond which it is possible to imagine a more complete user interface that allows you to recall previous queries and see results in different formats.
Conclusion
Bacalhau’s distributed computing approach enables retailers to address the limitations of centralized data processing. By adopting Bacalhau, retail chains can fully utilize their geographically dispersed data, gaining real-time insights, improving security, and achieving cost savings.
We hope this tutorial has demonstrated how to leverage Bacalhau’s features effectively. We’ve seen how to use labels and selectors to target specific nodes or node groups globally. We also covered how easy it is to switch from built-in storage to S3-compatible options by changing the publisher. Lastly, by utilizing Bacalhau’s robust API and the Python SDK, we can enhance user experiences with straightforward tools where interactivity is essential.