Writing your own client
How to write an Electric client for any language that speaks HTTP and JSON.
HTTP and JSON
You can create a client for Electric by:
- implementing a long-polling strategy to consume the HTTP API
- (optionally) materialising the shape log into a data structure or local store
- (optionally) providing reactivity bindings
Before you start
It's worth looking through the source code for the existing Typescript and Elixir clients. You're also welcome to raise an issue on GitHub and flag up your plans on Discord.
Your client also needs to be able to talk to a running instance of Electric.
Consume the HTTP API
The Electric sync service syncs data over an HTTP API. The primary job of a client is to consume this API using HTTP requests.
The HTTP API exposes Shapes. There are two phases to syncing a shape:
- initial sync where you load all the data the server is currently aware of
- live mode where you wait for and consume live updates in real-time
Initial sync
In the initial sync phase, you make a series of requests to get Shape data, increasing the offset
parameter until you get an frontier
message.
Construct your shape URL
Encode a shape definition into a GET /v1/shape
URL. See the specification for the URL structure here. For example, a Shape that contains all of the rows in the items
table would be requested with:
GET /v1/shape?table=items
Make the initial offset=-1
request
The first request to a shape should set the offset
parameter to -1
. This indicates to Electric that you want to consume all of the data from the beginning of the Shape log. For example, you might make a request to:
GET /v1/shape?table=items?offset=-1
The body of the response will contain a JSON array of messages. The headers of the response will contain two pieces of important information:
electric-handle
an ephemeral identifier to an existing shape logelectric-offset
the offset value for your next request
If the last message in the response body contains an frontier
control message:
{"headers":{"control":"frontier"}}
Then the response will also contain an:
electric-frontier
header
Either of which indicate that you can process the messages and switch into live mode. Otherwise, you should continue to accumulate messages by making additional requests to the same URL, with the new shape handle and offset. For example:
GET /v1/shape?table=items&handle=38083685-1729874417404&offset=0_0
In this way, you keep making GET requests with increasing offsets until you load all the data that the server is aware of, at which point you get the frontier
message.
Live mode
In live mode, if the server doesn't have any new data, it will hold open your request until either a timeout or new data arrives. This allows you to implement long polling, where you keep the request open, and reconnect immediately when new data arrives.
Why not websockets?!
Consuming data over HTTP allows us to leverage CDNs, simplifies observability and allows you to implement auth (and other capabilities) using HTTP proxies.
Add live
and cursor
parameters
Set live=true
to switch Electric into live mode. Make sure your request timeout is higher than the server timeout (which defaults to 20s
)
If the previous response contains an electric-cursor
header, then also set the cursor
parameter to its value. (This is an extra cache-busting parameter used to normalise request-collapsing behaviour across different CDNs).
For example:
GET /v1/shape?table=items&handle=38083685-1729874417404&offset=27344208_0&cursor=1674440&live=true
Keep polling
Live requests will either timeout, returning 204 No content
, or will return an array of messages and headers, just as with non live responses.
Keep pooling and whenever you get new data with an frontier
header/message then process the messages.
Materialise the shape log
How you choose to process shape log messages is up-to you. You can:
- stream the shape log messages through
- materialise the shape log into a data structure or database
Streaming messages
If you just want a stream of logical database operations, you can simply stream or broadcast these onwards. This is what both the Typescript client ShapeStream
class and Elixir client stream/3
function do.
Into a data structure
If you want to maintain a materialised Shape in your client, you can apply the operations in the shape log to a data structure. This is what both the Typescript client Shape
class and Redis example do.
Shape log messages are either control messages or logical insert
, update
or delete
operations. You can materialise a Shape by applying these to your chosen data structure. For example, for a Javascript Map
:
switch (message.headers.operation) {
case `insert`:
data.set(message.key, message.value)
break
case `update`:
data.set(message.key, {
...data.get(message.key)!,
...message.value,
})
break
case `delete`:
data.delete(message.key)
break
}
Into a database
As well as just a single data structure, it's possible to materialise one or more shapes into a local store. This can be very simple -- just update entries in a normalised store, no matter which shape they came through -- or can be complex, when aiming to maintain database invariants in a local embedded database such as PGlite.
Syncing into a database is out of scope of this guide
If you're interested in implementing it, raise an Issue or ask on Discord.
Transactions
Only apply logical operations to your materialised structure when you get an frontier
message. Then either apply that batch of operations to your data structure or store atomically, for example using some kind of transactional application primitive, or only trigger reactivity once all the changes are applied.
Reactivity bindings
If you maintain a materialised data structure, it's often useful to know when it changes. This is what the Typescript client's Shape.subscribe
function enables, for example.
This can then be used by a framework to trigger re-rendering. See the useShape
React hook source code for a real example but in short, e.g.: for a React component:
import { useEffect, useState } from 'react'
const MyComponent = ({ shapeDefinition }) => {
const [ data, setData ] = useState([])
useEffect(() => {
const stream = new ShapeStream(shapeDefinition)
const shape = new Shape(stream)
shape.subscribe(setData)
return () => {
shape.unsubscribe()
}
}, [shapeDefinition])
}
How you choose to provide this kind of API is very language dependent. You could support registering callbacks (like shape.subscribe
) and then call these whenever you've finished materialising your shape, or you could some kind of broadcast mechanism.
Examples
Let's walk through the process of implementing a client in a real programming language.
Brainfuck
++++++++[>++++++++++>++++++++++++++>+++++++++++++++>++++>+++++++>+++++<<<<<<-]>-.>--.--.>+.>.<<--.+++++.----.--.+++++.-------.>>.>+++.>+.
Python
Let's build a simple happy-path client in Python to materialise a Shape into a dict
. First create a new folder and make it a Python package:
mkdir example-client
cd example-client
touch __init__.py
Install the Requests HTTP client:
# Optionally in a virtualenv:
# virtualenv .venv
# source .venv/bin/activate
python -m pip install requests
Now let's write our Shape
client, saving the following in client.py
:
import requests
from urllib.parse import urlencode
class Shape(object):
"""Syncs a shape log and materialises it into a `data` dict."""
def __init__(self, base_url='http://localhost:3000', offset=-1, handle=None, table=None, where=None):
if table is None:
raise "Must provide a table"
# Request state used to build the URL.
self.base_url = base_url
self.cursor = None
self.handle = handle
self.live = False
self.offset = offset
self.table = table
self.where = where
# Materialiased data.
self.data = {}
# Accumulated messages (waiting for an `frontier` to apply).
self.messages = []
# Registered callbacks to notify when the data changes.
self.subscribers = []
def subscribe(self, callback):
"""Register a function that's called whenever the data changes."""
self.subscribers.append(callback)
def sync(self):
"""Start syncing. Note that this blocks the current thread."""
while True:
self.request()
def request(self):
"""Make a request to `GET /v1/shape` and process the response."""
# Build the URL based on the current parameters.
url = self.build_url()
# Fetch the response.
response = requests.get(url)
# This is a happy path example, so we just log error codes.
# A real client should handle errors, backoff, reconnect, etc.
if response.status_code > 204:
raise Exception("Error: {}".format(response.status_code))
# If the response is 200 then we may have new data to process.
if response.status_code == 200:
self.messages.append(response.json())
# If we've recieved a frontier control message, switch into
# live mode and process the accumulated change messages.
if 'electric-frontier' in response.headers:
self.live = True
self.process_messages()
# Set the shape handle, offset and optionally cursor for
# the next request from the response headers.
self.handle = response.headers['electric-handle']
self.offset = response.headers['electric-offset']
if 'electric-cursor' in response.headers:
self.cursor = r.headers['electric-cursor']
def process_messages(self):
"""Process any batched up messages. If the data has changed,
notify the subscribers.
"""
has_changed = False
# Process the accumulated messages.
for batch in self.messages:
for message in batch:
if 'operation' in message.get('headers', {}):
op_changed = self.apply_operation(message)
if op_changed:
has_changed = True
# Clear the queue.
self.messages = []
# If the data has changed, notify the subscribers.
if has_changed:
self.notify_subscribers()
def apply_operation(self, message):
"""Apply a logical operation message to the data dict.
Return whether the data has changed.
"""
key = message['key'].replace('"', '').split("/")[-1]
value = message.get('value')
operation = message['headers']['operation']
if operation == 'insert':
self.data[key] = value
return True
if operation == 'update':
has_changed = False
current_value = self.data[key]
for k, v in value:
if current_value.get(k) != v:
has_changed = True
current_value.update(new_value)
return has_changed
if operation == 'delete':
if key in self.data:
del self.data[key]
return True
return False
def notify_subscribers(self):
for callback in self.subscribers:
callback(self.data)
def build_url(self):
params = {
'offset': self.offset,
'table': self.table
}
if self.cursor is not None:
params['cursor'] = self.cursor
if self.handle is not None:
params['handle'] = self.handle
if self.live:
params['live'] = True
if self.where is not None:
params['where'] = self.where
return "{}/v1/shape?{}".format(self.base_url, urlencode(params))
Now let's create a test file to test running the client. Save the following in client.test.py
:
import multiprocessing
import unittest
from client import Shape
class TestClient(unittest.TestCase):
def test_shape_sync(self):
parent_conn, child_conn = multiprocessing.Pipe()
shape = Shape(table='items')
shape.subscribe(child_conn.send)
p = multiprocessing.Process(target=shape.sync)
p.start()
data = parent_conn.recv()
self.assertEqual(type(data), dict)
p.kill()
if __name__ == '__main__':
unittest.main()
Make sure you have Electric running and then:
$ python client.test.py
.
----------------------------------------------------------------------
Ran 1 test in 0.087s
OK