Distributed Data Warehousing with Bacalhau
Expanso helps leading organizations innovate quickly by helping them architect, implement, and maintain enterprise-grade distributed compute platforms. In fact, Expanso is the same team that created and currently maintains the industry-leading open-source software Bacalhau, built to improve cost and efficiency of processing large-scale datasets in distributed environments.
Challenges in traditional data warehousing.
Centralized systems struggle to scale with the growing data and user demands, making it expensive and complex.
Increased user and application access can lead to performance degradation, causing delays in data retrieval and analysis.
HIGH DATA ENGINEERING COSTS
Centralizing data requires significant data engineering efforts, especially when data comes from diverse sources.
Routine maintenance and upgrades can be resource-intensive and disruptive. Upkeep demands significant investments in hardware, software, and skilled personnel.
SLOW TO ADAPT
Adapting to new data sources or schema changes is often slow and complex in centralized architectures.
With many organizations investigating Data Mesh architectures, where data is treated and managed like a product by the teams that generate it, there may be a better way than centralizing this data.
Solution: Distributed data warehousing
As teams treat data as a product, perhaps under a federated governance model, querying data at its origin can bypass costly ETL processes. It’s often just as effective to send the computation to the data rather than the other way around, even outside a data mesh.
By positioning compute nodes close to where systems generate data, you can dispatch computational tasks or queries directly to the data, and then deliver results through the selected storage solution. The resulting computations are usually way smaller, making them easier to transmit than entire datasets, thereby enabling near real-time processing. Backing up required log data is still done conventionally.
Implementing a distributed data warehouse offers clear benefits:
Bacalhau as a distributed data warehouse orchestrator.
Bacalhau offers a way to tap into the benefits of a distributed data warehouse with minimal changes to existing processes. Many organizations already have the necessary data and compute resources for analytics, spread across various databases, servers, and edge locations.
With Bacalhau, these scattered resources can be harnessed to form a dynamic data warehouse. Installing its lightweight agents where the data already exists allows compute jobs to run on-site. This means there’s no need to move large datasets around or to majorly change ETL processes and data models.
FLEXIBLE COMPUTE NODES
Bacalhau’s compatibility with Docker and WebAssembly allows for a wide variety of workloads to be run efficiently. Its compute nodes are not only open for custom execution engines but are also versatile enough to support everything from .NET applications to legacy IBM AS/400 systems.
DIVERSE DATA ACCESS
To cut the high costs of data transfer, Bacalhau facilitates direct access to local data, reducing the need for extensive data movement. It supports S3-compatible storage, IPFS, and direct local storage, making data readily available for computation.
SMART JOB ALLOCATION
With Bacalhau, you can manage how jobs are assigned to compute nodes with precision. Nodes can be targeted for specific jobs using labels that denote their characteristics, and the latest platform updates allow for even more nuanced selections based on these labels.
Consider a retail chain with multiple stores spread across different regions. Each store has its own POS system collecting sales data. Traditionally, this data is batch-uploaded to a central data warehouse at the end of each day for processing and report generation.
With Bacalhau, each store installs a compute node that processes data locally. A control plane node orchestrates tasks across the compute nodes, distributing work based on various selection criteria. Retailers can now query data in near real-time, gaining instant insights without the need for extensive data transfer or central processing infrastructure.
Step 0 - Prerequisites
Before you start, you’ll need:
- Storage Solution: Have a storage provider or location ready for the job results.
- Firewall Configuration: Adjust your firewall settings so your node can talk to the rest of the Bacalhau network.
- Hosting Setup: Prepare a physical server, a virtual machine, or a cloud-based instance. Note that Bacalhau compute nodes should not be run inside a Docker container.
- Bacalhau CLI: Install the Bacalhau CLI on your local machine, following the instructions provided
Step 1 - Provisioning hardware
- Control Plane Node: This is your operation’s headquarters, coordinating tasks throughout your network.
- Instances: 1
- Disk Space: 25-100GB
- CPU: 1-8 cores (vCPU)
- RAM: 4-16GB
- Compute Node(s): These workhorses run your code and access data that’s local or close to it.
- Instances: 1-N (We’ll use 4 in this example, symbolizing 4 different locations)
- Disk Space: 32-500GB
- CPU: 1-8 cores (vCPU)
- RAM: 4-16GB
Note: It’s crucial that the Control Plane Node can communicate with the Compute Nodes. For guidance on this, you can follow this tutorial.
Step 2 - Installing the compute and requestor node
Bacalhau uses a node called a requester node to orchestrate jobs in the network, communicating with the compute nodes to distribute work according to the various selection criteria. Once you have installed Bacalhau, you can run the requester node as follows.
- Set Environment Variables: Record the environment variables from the requester node and apply them to each machine designated as a compute node.
- Install Compute Nodes: At each store location with data, you’ll need to install a Bacalhau compute node on a machine with data access. Use the standard Bacalhau installation instructions, but for a private network, you’ll specify a unique –peer value.
- Job Distribution: To distribute jobs to the compute nodes, you have two main strategies:
- Job Selection Policies: Implement custom logic to decide if a node should run a job. More details can be found in the job selection policy documentation.
- Node Labels: For a simpler approach, use node labels to target specific nodes or groups of nodes. This is the recommended method for this guide.
Note the addition of labels, which allow us to target specific nodes when we run our jobs. Here we add a store identifier, a region, a country and a city so that we can target queries in our warehouse to any of these labels. In reality, we may add more metadata here to provide even more flexibility in precisely targeting stores by one or more of these labels.
For this example, we’ll use nodes with the following labels:
|3||EU||DE||United States of America|
By strategically applying labels, you can direct jobs to:
- All stores, regardless of location.
- A single store within a specific region.
- All stores within a particular region.
- A single store within a given country.
- All stores across a specific country.
- A particular set of stores identified by their IDs.
Step 3 - Running a query across the network
Before we can query our data, we need to know what shape it has, so we want to run a query against the transaction data on one of the compute nodes (it doesn’t matter which). As we know that each compute node has access to transaction data at /node/data/transactions.csv we can query for that using DuckDB. If the data was made available in a form that DuckDB does not understand, we can use any other tool that works with docker, or webassembly, or even implement our own pluggable executor to support specific use-cases.
We’ll need to set an environment variable to point to our Bacalhau cluster, in this case by specifying BACALHAU_CLIENT_API_HOST as this will remove the need to provide a command line flag to the Bacalhau program on each invocation. As each command we run will also need to access the transactions database, we’ll also store that in an environment variable to reduce the amount of typing necessary.
To find the shape of our transaction data, we can run the following command to query the database and print the results to the terminal.
Here docker run is telling Bacalhau to run a docker container, -f tells it to log output to the terminal, -i sets up the input data, expanso/duckdb-ddw:0.0.1 is the docker container to run, and the final section is the query we want to run against the data. In this case, after a short delay, we should see the following output.
Step 4 - Getting the results
So far, we’ve only run queries that show output to the terminal using the -f flag. In practice, we’ll be running queries with more output, and potentially across multiple nodes. In this case we’ll want to publish the results, so that anything the compute task writes to the /outputs folder is made available to you in the terminal as a file (or files). To do this, we use the -p flag to specify a publisher.
As we want to store our output in S3 (or any S3-compatible storage), we have made sure that each of the compute nodes has credentials that allow it to connect to S3. Details on these credential requirements are available in the Bacalhau documentation. In our case, we want to store the output in an S3 bucket called “bacalhau-usecase-distributed-data-warehouse”. To avoid having to type this for each command, we’ll store the full publisher URL in an environment variable, showing we want to also include the job id, and the execution id in the output’s prefix.
We can now use specify -p $PUBLISHER in our docker run commands to have the output written to that location.
Step 5 - Working with the data
Now that we’re all set up, we can query our data. For instance, we can use the selector flags (-s) to target specific nodes. For instance, to find the total of all transactions in the Paris store, we can run:
This displays the output below:
At this point, we might want to get more data, perhaps a list of all the countries who buy products from our European stores. This time, we want the output to be stored in S3, and so we also specify -p $PUBLISHER so that if we write to /outputs then the data will be put into our bucket.
We now need to write out data to a specific location, and so we will do that with the following command. Note that we need to specify –target=all as we expect it to run on more than one compute node. Without this it will pick only a single node in that region.
This time, we see different output as Bacalhau shows us a job ID (in this case, 073ab816-9b9e-4dfa-9e90-6c4498aa1de6) and then shows progress as the job is happening. Once complete it tells us how we can get the details of the job, but running bacalhau describe 073ab816-9b9e-4dfa-9e90-6c4498aa1de6. Doing this shows lots of output, but the following cut-down snippet shows information on where the query was run, and where the outputs are stored.
Here we can see the two executions performed on EU nodes, with the bucket and key containing the outputs from our execution. Using the standard Bacalhau structure for outputs, we know that we will find CSV files in our bucket at 073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7c942a16-420d-4736-809c-1d6676e13a1c/outputs/results.csv and s3://bacalhau-usecase-distributed-data-warehouse/073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7e346e49-d659-4188-ae04-cf5c28fd963b/outputs/results.csv. To access this data requires that the user have AWS credentials, a tool to download the data, and a way to merge all of the results into one. Rather than burden the user with this work, we can wrap our command line invocations with something less complex.
Step 6 - Simplifying the interface
The previous sections of this tutorial have shown how to use, and specify, various Bacalhau features using the Bacalhau command line interface (CLI). While the interface is flexible and allows you to configure work in any way you wish, it does involve a lot of typing that might be overwhelming in an interactive scenario such as this.
Fortunately, Bacalhau provides an API, used by the command line interface, which means anything you can do in the CLI, you can do via its API. This provides even more flexibility in presentation, making it possible to build specialized interfaces for different use-cases. As an example of how you can use the Python SDK to build a specialized interface, you can take a look at the Distributed Data Warehouse Client which allows you to store commonly keyed information in a configuration file. This program of <200 lines of code let’s us move away from querying all regional stores like this rather lengthy command.
We are able to move to querying like the following and get merged results written locally, ready for opening in a spreadsheet or further processing. So quite neat and way simpler.
Note: Only the first line is the input by us, the rest is the response from the system itself.
After checking you have the dependencies described in the repository you can install this client to try it out with
Whilst this vastly reduces the complexity of the interface, not to mention the amount of typing, it is really just a starting point, beyond which it is possible to imagine a more complete user interface that allows you to recall previous queries and see results in different formats.
Bacalhau’s distributed computing approach empowers retailers to overcome the challenges associated with centralized data processing. By deploying Bacalhau, retail chains can harness the full potential of their geographically dispersed data, enabling real-time insights, enhanced security, and cost savings.
Hopefully this tutorial has shown how we can take advantage of individual Bacalhau features to achieve our goal. It has shown how we can use labels and selectors to target single nodes or groups of nodes distributed across the globe. How changing the publisher that is responsible for disseminating the results makes it easy it is to switch from built in storage to using S3-compatible options instead. Finally it has shown how by taking advantage of Bacalhau’s powerful API and using the Python SDK, we are able to provide a different a experience with simple tools where interactivity is required.