Demo code for processing ordered events in Apache Beam pipelines.
This repo contain a simulation of the Order Book processing in streaming and batch Apache Beam pipelines. It shows how ordering processing can be done in Apache Beam at scale, provides a fully functional pipeline, a simulator test harness and a set of scripts to visualize processing steps and the output of the pipeline.
The use case is maintaining an order book of security order events (buy, sell or cancellation) and producing the security's market depth on every trade.
The market depth data can be saved to a persistent storage for and additional analysis or can be analyzed in the same pipeline to build a streaming analytics solution.
Use case's is implemented as a standalone Java module (business model), with the core logic residing in the OrderBookBuilder class. The simulator module has utilities to generate order book events simulating financial institution trading sessions.
The pipeline uses the Beam's state and timers to process events in order. For a detailed description of the steps needed to implement the pipeline see this document.
- Clone this repo and switch to the checked out directory
- Designate or create a project to run the tests and create
terraform/terraform.tfvarsfile with the following content:
project_id = "<your project id>"
- Create infrastructure to run the demo:
cd terraform
terraform init
terraform apply
cd ..- Build the project
mvn clean installThis pipeline was tested using the JDK 11. If you have multiple JDKs installed, please set
the JAVA_HOME environment variable to point to the right JDK.
./start-pipeline.shThis will start a simulator which will be generating synthetic orders and expected order book events:
./run-pubsub-simulator.shOnce the pipeline is running, you can use BigQuery console, or bq utility to see how the pipeline
processes the data.
To see the processing state for the latest session:
WITH latest_statuses AS (
-- Stats for each contract
SELECT s.received_count,
s.buffered_count,
s.result_count,
s.duplicate_count,
s.last_event_received
FROM `ordered_processing_demo.processing_status` s
WHERE
-- Find latest session_id
session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC
LIMIT 1
)
-- Most recent stats by status_id across contract_id
QUALIFY RANK() OVER (PARTITION BY contract_id
ORDER BY status_ts DESC, received_count DESC) = 1)
SELECT COUNT(*) total_contracts,
COUNTIF(last_event_received
AND buffered_count = 0) fully_processed,
SUM(received_count) total_orders_received,
SUM(buffered_count) total_orders_buffered,
SUM(result_count) total_results_produced,
SUM(duplicate_count) total_duplicates
FROM latest_statuses;This query shows last 5 processing statuses per contract for the latest session:
SELECT *
FROM `ordered_processing_demo.processing_status`
WHERE session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC LIMIT 1)
QUALIFY RANK() OVER (PARTITION BY contract_id
ORDER BY status_ts DESC, received_count DESC) <= 5
ORDER BY contract_id, status_ts DESC, received_count DESC LIMIT 300SELECT *
FROM `ordered_processing_demo.market_depth`
WHERE session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.market_depth`
ORDER BY session_id DESC LIMIT 1)
QUALIFY RANK() OVER ( PARTITION BY contract_id
ORDER BY contract_sequence_id DESC) <= 5
ORDER BY contract_id, contract_sequence_id DESC LIMIT 300These latencies represent the differences between the time the data was generated in the simulator to the time the data became available for querying in BigQuery. There are a number of factors affecting these latencies - the delays in publishing the event via Pub/Sub, Dataflow autoscaling events, percentage of the events processed out of order, and frequency of flushes in the BigQueryIO.
WITH last_session_latencies AS (SELECT APPROX_QUANTILES(TIMESTAMP_DIFF(ingest_ts, event_ts, SECOND),
100) quantiles
FROM `ordered_processing_demo.market_depth`
WHERE session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC
LIMIT 1)
)
SELECT quantiles[OFFSET(0)] AS min,
quantiles[
OFFSET(20)] AS percentile20, quantiles[OFFSET(50)] AS median, quantiles[OFFSET(90)] AS percentile90, quantiles[OFFSET(100)] AS max,
FROM last_session_latenciesSimilar to the market depth performance, but in this case orders are saved directly into BigQuery.
WITH last_session_latencies AS (SELECT APPROX_QUANTILES(TIMESTAMP_DIFF(ingest_ts, event_ts, SECOND),
100) quantiles
FROM `ordered_processing_demo.order_event`
WHERE session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC
LIMIT 1)
)
SELECT quantiles[OFFSET(0)] AS min,
quantiles[
OFFSET(20)] AS percentile20, quantiles[OFFSET(50)] AS median, quantiles[OFFSET(90)] AS percentile90, quantiles[OFFSET(100)] AS max,
FROM last_session_latenciesYou can run multiple, or parallel, simulator runs to see how the pipeline works. Once you are done,
./stop-pipeline.shTo test pipeline performance using different number of contracts, total number of orders and different pipeline parameters:
./run-perf-test.sh <number-of-contract> <total-number-of-orders> <number-of-inital-workers> <disable-horizontal-autoscaling>The first three parameters are required. Any value passed in forth parameter will disable the autoscaling.
The script will start the pipeline, wait for the workers to be ready to process the data, run the
simulator and wait for the processing completion. It will then shut down the pipeline. The results
of the run test run are appended to the test-log.txt file, e.g.:
2024-01-12 09:47:08 - session 2024-01-12.09:36 with 50056466 events for 3000 contracts processed in 657 seconds by pipeline 2024-01-12_09_33_55-3943530097295374913.
terraform -chdir terraform destroy Contributions to this repo are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.