Models

The Models SDK enables you to keep your frontend applications up to date with your backend database. A model is a single instance of live, observable data. It uses a sync function to initialize with the latest data and a merge function to update its state upon receiving change events from your backend database.

To receive notifications when there are changes in the state, subscribe to changes.

Create a model using the models.get() method on the client. If you specify a name that already exists, it will be returned.

Instantiate a model using a unique name. A unique name identifies the model on the client and corresponds to the channel name used to subscribe to state updates from the backend:

Select...
const model = modelsClient.models.get({ channelName: 'post:123', sync, merge, });
Copied!

The sync function instructs your model to initialize with the latest data from the backend and is necessary when creating a model. It can be any function that optionally accepts parameters and returns a promise with the latest state of your data model, along with a sequenceId.

The following is an example sync function that fetches the latest model state from your database:

Select...
async function sync(id: number, page: number) { const result = await fetch(`/api/post/${id}?page=${page}`); return result.json(); }
Copied!

The following example is an output from the sync function:

{ "sequenceId": "1", "data": { "id": 123, "text": "Hello World", "comments": [] } }
Copied!

The sync function needs to be registered when a model is instantiated:

Select...
const model = modelsClient.models.get({ channelName: 'post:123', sync, merge, });
Copied!

Call the sync function directly on the model to bootstrap its state during the initial page load:

Select...
await model.sync(123, 1);
Copied!

The SDK automatically attempts to re-execute the sync function in the case of an error, as specified by the syncOptions.retryStrategy defined in your ClientOptions.

The sequenceId enables the SDK to identify the point in the stream of change events that corresponds to the current version of the database’s state.

Internally, the SDK replays change events starting from the point indicated by the sequenceId to update the model state with any realtime changes occurring since the state was read from the database.

The sync function calls your backend endpoint to retrieve the highest sequenceId present in the outbox when reading the state:

-- Start a database transaction BEGIN; -- Query your model state SELECT * FROM …; -- Obtain the sequence ID SELECT COALESCE(MAX(sequence_id), 0) FROM outbox; -- Commit the database transaction COMMIT;
Copied!

Use the read committed transaction isolation level for the correct function, when reading the database state. PostgreSQL uses this as default.

The SDK must initialize the model’s state using the sync function and then precisely apply a sequence of change events from your backend, beginning at the correct point in the stream. This starting point is the sequenceId your backend endpoint returns.

After the SDK executes the sync function, it internally connects to the channel and begins subscribing to live, yet unprocessed, messages. It then paginates backward through the channel’s message history, starting from the attachment point, using untilAttach. The number of items on each page can be configured via the syncOptions.historyPageSize defined in your ClientOptions.

When a message is sent to the sequenceId, the model processes subsequent messages to maintain continuity with live messages. If the target sequenceId is unreachable, there is insufficient message history to resume from the correct point to update the model. The SDK will attempt a re-sync. It re-syncs by calling the sync function again to acquire a newer version of the model state.

The amount of history available to query on the channel is determined by your message storage configuration on the channel. This configuration must match the syncOptions.messageRetentionPeriod defined in your ClientOptions. The SDK uses this configuration option as a hint as to whether to resynchronize via the sync function and skip paginating through history when messages are expected to have expired.

To ensure the model’s state is up to date, cache the state obtained from the backend endpoint used by the sync function. The model state returned from this endpoint must not exceed the message retention period set on the channel. If no previous messages exist on the channel, the model is assumed to be a new state without any modifications.

A merge function is required when you instantiate a model. The merge function instructs your model how to compute the next version of the model state upon receiving a change event from the backend. The merge function takes the previous model state and the change event as inputs and outputs the next version of the model state.

The following example is a merge function, invoked for all events received on the channel name specified in the model:

Select...
function merge(state: Post, event: OptimisticEvent | ConfirmedEvent) { if (event.name === 'addComment') { return { ...state, comments: state.comments.concat([event.data]), }; } // handle other event types }
Copied!

The merge function needs to be registered when a model is instantiated:

Select...
const model = modelsClient.models.get({ channelName: 'post:123', sync, merge, });
Copied!

The event passed to the merge function can be either confirmed or optimistic:

Confirmed
change events received from your backend. They describe the result of a change to the data which has been committed to your database.
Optimistic
events that describe mutations that have happened locally, but have not yet been confirmed by your backend.

The SDK might call the merge function multiple times with the same state and event; to rebase optimistic events onto the newly confirmed state. It’s essential that the merge function, when executed with identical inputs, consistently produces the same output. The merge function should be pure and deterministic, not depending on any external state.

Change events from your backend are delivered to the SDK as messages over the channels. By default, messages are processed by the SDK in the order in which they are received.

The SDK supports a short buffering time buffering for change events, facilitating short-term reordering and de-duplication. By default, his feature is not enabled. To activate it, set the eventBufferOptions.bufferMS in the ClientOption to a non-zero value.

When using the event buffer, change events are de-duplicated and ordered according to their message.ID, which corresponds to the change event’s sequenceId.

By default, the events in the buffer are ordered numerically if the message.ID can be coerced to a number. Otherwise, events will be ordered lexicographically by their message.ID. You can specify a custom ordering based on any part of the message via the eventBufferOptions.eventOrderer ClientOption.

The Models SDK supports optimistic updates, a feature that enables you to immediately render changes to your data model before the backend confirms the changes, making updates appear instantaneous.

Optimistic events are used to make local, provisional changes to your data, anticipating that your backend will eventually confirm or reject these changes. The optimistic event will be processed by your merge function and included in the local model state optimistically.

Use the model.optimistic() method to apply an optimistic update to your model.

  1. Call model.optimistic() on your model with the optimistic event.
  2. Apply the corresponding change to your backend.
  3. Await the confirmation of the optimistic update from the backend, or optionally cancel the optimistic update.

The model.optimistic() function returns a promise resolving to two values:

  • A confirmation promise that resolves when the backend confirms or rejects the optimistic update.
  • A function to explicitly cancel the optimistic update.

The following demonstrates an optimistic update to implementing changes in the model:

Select...
// optimistically apply the changes to the model const [confirmation, cancel] = await model.optimistic({ mutationId: 'my-mutation-id', name: 'addComment', data: 'New comment!', }); try { // apply the changes in your backend await updatePost('my-mutation-id', 'New comment!'); // wait for the optimistic event to be confirmed await confirmation; } catch (err) { // something went wrong, cancel the optimistic update cancel(); }
Copied!

The SDK requires a mechanism to identify when a change event received from the backend matches an optimistic event that has already been applied locally.

To achieve this, your clients need to assign a unique mutationId to each optimistic event. This ID can be any string, though it’s commonly a UUID. The mutationId must be sent to your backend and included in the confirmation event that your backend writes to the outbox:

BEGIN; -- mutate your data, e.g.: INSERT INTO comments (comment) VALUES ('New comment!'); -- write change event to outbox, e.g.: INSERT INTO outbox (mutation_id, channel, name, ...) VALUES ('my-mutation-id', 'posts:123', 'addComment', ...); COMMIT;
Copied!

The SDK automatically rewinds the optimistic event and rejects the associated confirmation promise if the corresponding confirmed change event is not received from the backend within a specified timeframe. You can configure this timeout period using the optimisticEventOptions.timeout ClientOption.

You can also broadcast a rejection event from your backend in order to explicitly reject a given optimistic update. This is achieved by setting the rejected flag to true in the outbox record:

You can reject optimistic updates by broadcasting a rejection event from your backend. Set the rejected flag to true in the outbox record:

BEGIN; -- mutate your data, e.g.: INSERT INTO comments (comment) VALUES ('New comment!'); -- write change event to outbox, e.g.: INSERT INTO outbox (mutation_id, channel, name, rejected, ...) VALUES ('my-mutation-id', 'posts:123', 'addComment', true, ...); COMMIT;
Copied!

To receive realtime notifications when there are changes in the state, you can subscribe to changes to the changes.

The following example subscribes to model state changes:

Select...
model.subscribe((err, post) => { /* model state updated! */ });
Copied!

Subscriptions operate on an optimistic model by default. The callback is triggered whenever the optimistic state changes or confirmed changes occur.

To respond solely to confirmed changes, set the optimistic option to false in the subscribe():

Select...
model.subscribe((err, post) => { /* ... */ }, { optimistic: false });
Copied!

A model instance can be in one of the following lifecycle states:

initialized
the model has been initialized but has not yet attached to the underlying channel.
syncing
the model is synchronizing its state from the backend.
ready
the model is attached to the channel and processing realtime.
paused
the user has paused the model.
errored
the model has errored processing data from the sync, or from the stream.
disposed
the model has been disposed, either by the user disposing it or an unrecoverable error.

Listen for model state change events on a model instance:

Select...
model.on('paused', () => { /* model paused*/ }); model.on('ready', () => { /* model resumed */ }); model.on('disposed', () => { /* model disposed */ });
Copied!

You can pause a model to prevent the model from consuming realtime updates, while reserving the ability to resume it at some point in the future. Pausing can be useful in instances such as when the UI rendering the model data is temporarily out-of-view.

The following pauses the model. New events will not be processed and subscription callbacks will not be invoked:

Select...
await model.pause();
Copied!

The following example un-pauses the model. Event processing will resume and changes will be made available to subscribers:

Select...
await model.resume();
Copied!

When a model is no longer needed, disposed of it to free up resources:

Select...
await model.dispose();
Copied!
Create or retrieve a model
v2.0