Reading Data#

This tutorial will show the basics of reading data from a MongoDB server. It assumes that you have read the Connecting to a MongoDB Server tutorial, and that you have a MongoDB server that contains data you wish to read.

Declaring Application State Type#

This program will require that certain objects outlive the sub-operations, so we will need to persist them outside the event loop and pass them to our continuation. We declare a simple aggregate type to hold these objects:

Application State Struct#
 3// Application state
 4typedef struct {
 5    // The client
 6    amongoc_client* client;
 7    // The collection we are reading from
 8    amongoc_collection* collection;
 9    // The name of the database we read from
10    const char* db_name;
11    // The name of the collection within the database
12    const char* coll_name;
13    int         batch_num;
14} app_state;

Parsing Command Arguments#

Our program will take three parameters: A MongoDB server URI, a database name, and a collection name. We can parse those as the start of main:

Argument Handling#
18int main(int argc, char** argv) {
19    // Program parameters
20    if (argc != 4) {
21        fprintf(stderr, "Usage: %s <uri> <database> <collection>\n", argv[0]);
22        return 2;
23    }
24    // The server URI
25    const char* const uri = argv[1];
26
27    // Initialize our state
28    app_state state = {
29        .db_name   = argv[2],
30        .coll_name = argv[3],
31    };

We store the database name and the collection name in the shared state struct so that we can access it in subsequent operations.

Starting with a Connection#

We next do the basics of setting up an event loop and creating a connection emitter:

Setting up an event loop#
33    amongoc_loop loop;
34    amongoc_if_error (amongoc_default_loop_init(&loop), msg) {
35        fprintf(stderr, "Error initializing the event loop: %s\n", msg);
36        return 1;
37    }
38
39    // Connect
40    amongoc_emitter em = amongoc_client_new(&loop, uri);

Create the First Continuation#

We have the pending connection object and the application state, so now we can set the first continuation:

The first continuation#
42    // Set the continuation
43    em = amongoc_let(em, amongoc_async_forward_errors, amongoc_box_pointer(&state), on_connect);

The arguments are as follows:

  1. Passing the emitter we got from amongoc_client_new(), and replacing it with the new operation emitter returned by amongoc_let().

  2. amongoc_async_forward_errors is a behavior control flag for amongoc_let() that tells the operation to skip our continuation if the input operation generates an error.

  3. We pass the address of the state object by wrapping it with amongoc_box_pointer(), passing it as the userdata parameter of amongoc_let().

  4. on_connect is the name of the function that will handle the continuation.

Define the Continuation#

Now we can look at the definition of on_connect:

Continuation signature#
64// Continuation after connection completes
65static amongoc_emitter on_connect(amongoc_box state_, amongoc_status status, amongoc_box client) {
66    // We don't use the status
67    (void)status;

The state_ parameter is the userdata box that was given to amongoc_let() in the previous section. The client parameter is a box that contains the amongoc_client pointer from the connection operation. The status parameter here is the status of the connection operation, but we don’t care about this here since we used amongoc_async_forward_errors to ask amongoc_let() to skip this function if the status would otherwise indicate an error (i.e. on_connect will only be called if the connection actually succeeds).

Update the Application State#

on_connect is passed the pointer to the application state as well as a box containing a pointer to the amongoc_client. We can extract the box value and store the client pointer within our application state struct:

Update the Application State#
68    // Store the client object
69    app_state* state = amongoc_box_cast(app_state*, state_);
70    amongoc_box_take(state->client, client);

We store it in the application state object so that we can later delete the client handle when the program completes.

Create a Collection Handle#

Now that we have a valid client, we can create a handle to a collection on the server:

Set up the Collection#
71    // Create a new collection handle
72    state->collection = amongoc_collection_new(state->client, state->db_name, state->coll_name);

amongoc_collection_new() does not actually communicate with the server: It only creates a client-side handle that can be used to perform operations related to that collection.

We store the returned collection handle in the state struct so that we can later delete it.

Initiate a Read Operation#

There are several different reading operations and reading parameters. For this program, we’ll simply do a “find all” operation:

Start a “Find All” Operation#
73    // Initiate a read operation.
74    amongoc_emitter em = amongoc_find(state->collection, bson_view_null, NULL);
75    return amongoc_let(em, amongoc_async_forward_errors, state_, on_find);

The amongoc_find() function will return a new amongoc_emitter that represents the pending read from a collection. The first parameter state->collection is a pointer to a collection handle. The second parameter bson_view_null is a filter for the find operation. In this case, we are passing no filter, which is equivalent to an empty filter, telling the database that we want to find all documents in the collection. The third parameter is a set of common named parameters for find operations. In this case, we are passing NULL, because we don’t care to specify any more parameters.

We attach a second continuation using amongoc_let() to a function on_find.

Read from The Cursor#

When the amongoc_find() emitter resolves, it resolves with an amongoc_cursor object. We’ll extract that in on_find:

Handle Data#
78// Continuation when we get data
79static amongoc_emitter on_find(amongoc_box state_, amongoc_status status, amongoc_box cursor_) {
80    // We don't use the status
81    (void)status;
82    app_state* state = amongoc_box_cast(app_state*, state_);
83    // Extract the cursor
84    amongoc_cursor cursor;
85    amongoc_box_take(cursor, cursor_);

We can print out the cursor’s batch of data:

Print the Records#
87    // Print the data
88    fprintf(stderr,
89            "Got results from database '%s' collection '%s', batch %d: ",
90            state->db_name,
91            state->coll_name,
92            state->batch_num);
93    bson_write_repr(stderr, cursor.records);
94    fprintf(stderr, "\n");
95    state->batch_num++;

Read Some More#

A single read may not return the full set of data, so we may need to initiate a subsequent read with amongoc_cursor_next():

Check for more#
 97    // Do we have more data?
 98    if (cursor.cursor_id) {
 99        // More to find
100        amongoc_emitter em = amongoc_cursor_next(cursor);
101        return amongoc_let(em, amongoc_async_forward_errors, state_, on_find);
102    } else {
103        // Done
104        amongoc_cursor_delete(cursor);
105        return amongoc_just();
106    }
107}

By passing on_find to amongoc_let() again, we create a loop that continually calls on_find until the cursor ID is zero (indicating a finished read).

If we have no more data, we destroy the cursor object and return a null result immediately with amongoc_just().

Tie, Start, and Run the Operation#

Going back to main, we can now tie the operation, start it, and run the event loop:

Run the Program#
45    // Run the program and collect the result
46    amongoc_status    status;
47    amongoc_operation op = amongoc_tie(em, &status);
48    amongoc_start(&op);
49    amongoc_default_loop_run(&loop);
50    amongoc_operation_delete(op);
51    amongoc_collection_delete(state.collection);
52    amongoc_client_delete(state.client);
53    amongoc_default_loop_destroy(&loop);

amongoc_tie() tells the emitter to store its final status and result value in the given destinations, and returns an operation that can be initiated with amongoc_start(). We then give control to the event loop with amongoc_default_loop_run(). After this returns, the operation object is finished, so we delete it with amongoc_operation_delete().

We are also finished with the collection handle and the client, so we delete those here as well. Those struct members will have been filled in by on_connect.

Check for Errors#

If any sub-operation failed, the error status will be propagated to the final status, so we check that before exiting:

Check for Errors#
55    // Check for errors
56    amongoc_if_error (status, msg) {
57        fprintf(stderr, "Error: %s\n", msg);
58        return 1;
59    }

The Full Program#

Here is the full sample program, in its entirety:

read.example.c#
  1#include <amongoc/amongoc.h>
  2
  3// Application state
  4typedef struct {
  5    // The client
  6    amongoc_client* client;
  7    // The collection we are reading from
  8    amongoc_collection* collection;
  9    // The name of the database we read from
 10    const char* db_name;
 11    // The name of the collection within the database
 12    const char* coll_name;
 13    int         batch_num;
 14} app_state;
 15
 16static amongoc_emitter on_connect(amongoc_box state, amongoc_status status, amongoc_box client);
 17
 18int main(int argc, char** argv) {
 19    // Program parameters
 20    if (argc != 4) {
 21        fprintf(stderr, "Usage: %s <uri> <database> <collection>\n", argv[0]);
 22        return 2;
 23    }
 24    // The server URI
 25    const char* const uri = argv[1];
 26
 27    // Initialize our state
 28    app_state state = {
 29        .db_name   = argv[2],
 30        .coll_name = argv[3],
 31    };
 32
 33    amongoc_loop loop;
 34    amongoc_if_error (amongoc_default_loop_init(&loop), msg) {
 35        fprintf(stderr, "Error initializing the event loop: %s\n", msg);
 36        return 1;
 37    }
 38
 39    // Connect
 40    amongoc_emitter em = amongoc_client_new(&loop, uri);
 41
 42    // Set the continuation
 43    em = amongoc_let(em, amongoc_async_forward_errors, amongoc_box_pointer(&state), on_connect);
 44
 45    // Run the program and collect the result
 46    amongoc_status    status;
 47    amongoc_operation op = amongoc_tie(em, &status);
 48    amongoc_start(&op);
 49    amongoc_default_loop_run(&loop);
 50    amongoc_operation_delete(op);
 51    amongoc_collection_delete(state.collection);
 52    amongoc_client_delete(state.client);
 53    amongoc_default_loop_destroy(&loop);
 54
 55    // Check for errors
 56    amongoc_if_error (status, msg) {
 57        fprintf(stderr, "Error: %s\n", msg);
 58        return 1;
 59    }
 60}
 61
 62static amongoc_emitter on_find(amongoc_box state_, amongoc_status status, amongoc_box cursor_);
 63
 64// Continuation after connection completes
 65static amongoc_emitter on_connect(amongoc_box state_, amongoc_status status, amongoc_box client) {
 66    // We don't use the status
 67    (void)status;
 68    // Store the client object
 69    app_state* state = amongoc_box_cast(app_state*, state_);
 70    amongoc_box_take(state->client, client);
 71    // Create a new collection handle
 72    state->collection = amongoc_collection_new(state->client, state->db_name, state->coll_name);
 73    // Initiate a read operation.
 74    amongoc_emitter em = amongoc_find(state->collection, bson_view_null, NULL);
 75    return amongoc_let(em, amongoc_async_forward_errors, state_, on_find);
 76}
 77
 78// Continuation when we get data
 79static amongoc_emitter on_find(amongoc_box state_, amongoc_status status, amongoc_box cursor_) {
 80    // We don't use the status
 81    (void)status;
 82    app_state* state = amongoc_box_cast(app_state*, state_);
 83    // Extract the cursor
 84    amongoc_cursor cursor;
 85    amongoc_box_take(cursor, cursor_);
 86
 87    // Print the data
 88    fprintf(stderr,
 89            "Got results from database '%s' collection '%s', batch %d: ",
 90            state->db_name,
 91            state->coll_name,
 92            state->batch_num);
 93    bson_write_repr(stderr, cursor.records);
 94    fprintf(stderr, "\n");
 95    state->batch_num++;
 96
 97    // Do we have more data?
 98    if (cursor.cursor_id) {
 99        // More to find
100        amongoc_emitter em = amongoc_cursor_next(cursor);
101        return amongoc_let(em, amongoc_async_forward_errors, state_, on_find);
102    } else {
103        // Done
104        amongoc_cursor_delete(cursor);
105        return amongoc_just();
106    }
107}
108// end:on_find