Advanced¶
Versions¶
Monstache version | Git branch (used to build plugin) | Docker tag | Description | Elasticsearch | MongoDB | Status |
---|---|---|---|---|---|---|
6 | rel6 | rel6, latest | MongoDB, Inc. go driver | Version 7+ | Version 2.6+ | Supported |
5 | rel5 | rel5 | MongoDB, Inc. go driver | Version 6 | Version 2.6+ | Supported |
4 | master | rel4 | mgo community go driver | Version 6 | Version 3 | Deprecated |
3 | rel3 | rel3 | mgo community go driver | Versions 2 and 5 | Version 3 | Deprecated |
Note
You can use monstache rel5
and rel6
with MongoDB versions back to 2.6 with the following caveats.
If you have MongoDB 3.6 then you must explicitly enumerate collections in your change-stream-namespaces
setting because change streams against databases and entire deployments was not introduced until MongoDB version
4.0. Alternatively, you can disable change events entirely with disable-change-events
. You must also set
resume-strategy
to 1
to use a token-based resume strategy compatibile with MongoDB API 3.6.
If you have MongoDB 2.6 - 3.5 then you must omit any mention of change-stream-namespaces
in your config file
because change streams were first introduced in 3.6. To emulate change events you must turn on the option
enable-oplog
. Alternatively, you can disable change events entirely with disable-change-events
.
Warning
Your MongoDB binary version does not always mean that the feature compatibility is at that same level. Check your feature compatibility version from the MongoDB console to ensure that MongoDB is not operating in a lesser capability mode. This sometimes happens when MongoDB is upgraded in place or MongoDB is started with a data directory of a previous installation. Sometimes there are reasons to stay at a lower feature compatibility so check before you upgrade it.
GridFS Support¶
Monstache supports indexing the raw content of files stored in GridFS into Elasticsearch for full
text search. This feature requires that you install an Elasticsearch plugin which enables the field type attachment
.
For versions of Elasticsearch prior to version 5 you should install the
mapper-attachments plugin.
For version 5 or later of Elasticsearch you should instead install the
ingest-attachment plugin.
Once you have installed the appropriate plugin for Elasticsearch, getting file content from GridFS into Elasticsearch is as simple as configuring monstache. You will want to enable the index-files option and also tell monstache the namespace of all collections which will hold GridFS files. For example in your TOML config file,
index-files = true
direct-read-namespaces = ["users.fs.files", "posts.fs.files"]
file-namespaces = ["users.fs.files", "posts.fs.files"]
file-highlighting = true
The above configuration tells monstache that you wish to index the raw content of GridFS files in the users
and posts
MongoDB databases. By default, MongoDB uses a bucket named fs
, so if you just use the defaults your collection name will
be fs.files
. However, if you have customized the bucket name, then your file collection would be something like mybucket.files
and the entire namespace would be users.mybucket.files
.
When you configure monstache this way it will perform an additional operation at startup to ensure the destination indexes in
Elasticsearch have a field named file
with a type mapping of attachment
.
For the example TOML configuration above, monstache would initialize 2 indices in preparation for indexing into Elasticsearch by issuing the following REST commands:
For Elasticsearch versions prior to version 5...
POST /users.fs.files
{
"mappings": {
"fs.files": {
"properties": {
"file": { "type": "attachment" }
}}}}
POST /posts.fs.files
{
"mappings": {
"fs.files": {
"properties": {
"file": { "type": "attachment" }
}}}}
For Elasticsearch version 5 and above...
PUT /_ingest/pipeline/attachment
{
"description" : "Extract file information",
"processors" : [
{
"attachment" : {
"field" : "file"
}
}
]
}
When a file is inserted into MongoDB via GridFS, monstache will detect the new file, use the MongoDB api to retrieve the raw
content, and index a document into Elasticsearch with the raw content stored in a file
field as a base64
encoded string. The Elasticsearch plugin will then extract text content from the raw content using
Apache Tika, tokenize the text content, and allow you to query on the content of the file.
To test this feature of monstache you can simply use the mongofiles
command to quickly add a file to MongoDB via GridFS. Continuing the example above one could issue the following command to put a
file named resume.docx
into GridFS and after a short time this file should be searchable in Elasticsearch in the index users.fs.files
.
mongofiles -d users put resume.docx
After a short time you should be able to query the contents of resume.docx in the users index in Elasticsearch
curl -XGET "http://localhost:9200/users.fs.files/_search?q=golang"
If you would like to see the text extracted by Apache Tika you can project the appropriate sub-field
For Elasticsearch versions prior to version 5...
curl -H "Content-Type:application/json" localhost:9200/users.fs.files/_search?pretty -d '{
"fields": [ "file.content" ],
"query": {
"match": {
"file.content": "golang"
}
}
}'
For Elasticsearch version 5 and above...
curl -H "Content-Type:application/json" localhost:9200/users.fs.files/_search?pretty -d '{
"_source": [ "attachment.content" ],
"query": {
"match": {
"attachment.content": "golang"
}
}
}'
When file-highlighting is enabled you can add a highlight clause to your query
For Elasticsearch versions prior to version 5...
curl -H "Content-Type:application/json" localhost:9200/users.fs.files/_search?pretty -d '{
"fields": ["file.content"],
"query": {
"match": {
"file.content": "golang"
}
},
"highlight": {
"fields": {
"file.content": {
}
}
}
}'
For Elasticsearch version 5 and above...
curl -H "Content-Type:application/json" localhost:9200/users.fs.files/_search?pretty -d '{
"_source": ["attachment.content"],
"query": {
"match": {
"attachment.content": "golang"
}
},
"highlight": {
"fields": {
"attachment.content": {
}
}
}
}'
The highlight response will contain emphasis on the matching terms
For Elasticsearch versions prior to version 5...
"hits" : [ {
"highlight" : {
"file.content" : [ "I like to program in <em>golang</em>.\n\n" ]
}
} ]
For Elasticsearch version 5 and above...
"hits" : [{
"highlight" : {
"attachment.content" : [ "I like to program in <em>golang</em>." ]
}
}]
Workers¶
You can run multiple monstache processes and distribute the work between them. First configure the names of all the workers in a shared config.toml file.
workers = ["Tom", "Dick", "Harry"]
In this case we have 3 workers. Now we can start 3 monstache processes and give each one of the worker names.
monstache -f config.toml -worker Tom
monstache -f config.toml -worker Dick
monstache -f config.toml -worker Harry
monstache will hash the id of each document using consistent hashing so that each id is handled by only one of the available workers.
High Availability¶
You can run monstache in high availability mode by starting multiple processes with the same value for cluster-name. Each process will join a cluster which works together to ensure that a monstache process is always syncing to Elasticsearch.
High availability works by ensuring one active process in the monstache.cluster
collection in MongoDB at any given time. Only the process in
this collection will be syncing for the cluster. Processes not present in this collection will be paused. Documents in the
monstache.cluster
collection have a TTL assigned to them. When a document in this collection times out it will be removed from
the collection by MongoDB and another process in the monstache cluster will have a chance to write to the collection and become the
new active process.
When cluster-name is supplied the resume feature is automatically turned on and the resume-name becomes the name of the cluster. This is to ensure that each of the processes is able to pick up syncing where the last one left off.
You can combine the HA feature with the workers feature. For 3 cluster nodes with 3 workers per node you would have something like the following:
// config.toml
workers = ["Tom", "Dick", "Harry"]
// on host A
monstache -cluster-name HA -worker Tom -f config.toml
monstache -cluster-name HA -worker Dick -f config.toml
monstache -cluster-name HA -worker Harry -f config.toml
// on host B
monstache -cluster-name HA -worker Tom -f config.toml
monstache -cluster-name HA -worker Dick -f config.toml
monstache -cluster-name HA -worker Harry -f config.toml
// on host C
monstache -cluster-name HA -worker Tom -f config.toml
monstache -cluster-name HA -worker Dick -f config.toml
monstache -cluster-name HA -worker Harry -f config.toml
When the clustering feature is combined with workers then the resume-name becomes the cluster name concatenated with the worker name.
Index Mapping¶
When indexing documents from MongoDB into Elasticsearch the default mapping is as follows:
For Elasticsearch prior to 6.2
Elasticsearch index name <= MongoDB database name . MongoDB collection name
Elasticsearch type <= MongoDB collection name
Elasticsearch document _id <= MongoDB document _id
For Elasticsearch 6.2+
Elasticsearch index name <= MongoDB database name . MongoDB collection name
Elasticsearch type <= _doc
Elasticsearch document _id <= MongoDB document _id
If these default won't work for some reason you can override the index and type mapping on a per collection basis by adding the following to your TOML config file:
[[mapping]]
namespace = "test.test"
index = "index1"
type = "type1"
[[mapping]]
namespace = "test.test2"
index = "index2"
type = "type2"
With the configuration above documents in the test.test
namespace in MongoDB are indexed into the index1
index in Elasticsearch with the type1
type.
If you need your index and type mapping to be more dynamic, such as based on values inside the MongoDB document, then see the sections Middleware and Routing.
Warning
It is not recommended to override the default type of _doc
if using Elasticsearch 6.2+ since this will be the supported path going forward.
Also, using _doc
as the type will not work with Elasticsearch prior to 6.2.
Make sure that automatic index creation is not disabled in elasticsearch.yml or create your target indexes before using Monstache.
If automatic index creation must be controlled, whitelist any indexes in elasticsearch.yml that monstache will create.
Namespaces¶
When a document is inserted, updated, or deleted in MongoDB a document is appended to the oplog representing the event. This document has a field ns
which is the namespace. For inserts, updates, and deletes the namespace is the database name and collection name of the document changed joined by a dot. E.g. for use test; db.foo.insert({hello: "world"});
the namespace for the event in the oplog would be test.foo
.
In addition to inserts, updates, and deletes monstache also supports database and collection drops. When a database or collection is dropped in MongoDB an event is appended to the oplog. Like the other types of changes this event has a field ns
representing the namespace. However, for drops the namespace is the database name and the string $cmd
joined by a dot. E.g. for use test; db.foo.drop()
the namespace for the event in the oplog would be test.$cmd
.
Middleware¶
monstache supports embedding user defined middleware between MongoDB and Elasticsearch. Middleware is able to transform documents, drop documents, or define indexing metadata. Middleware may be written in either Javascript or in Golang as a plugin.
Warning
It is HIGHLY recommended to use a golang plugin in production over a javascript plugin due to performance differences. Currently, golang plugins are orders of magnitude faster than javascript plugins. This is due to concurrency and the need to perform locking on the javascript environment. Javascript plugins are very useful for quickly prototyping a solution, however at some point it is recommended to convert them to golang plugins.
If you enable a Golang plugin then monstache will ignore an javascript middleware in your configuration. This may change in the future but for now the choice of middleware language is mutually exclusive.
Golang¶
monstache supports golang plugins. You should have golang version 1.11 or greater installed and will need to perform the build on the Linux or OSX platform. Golang plugins are not currently supported on the Windows platform due to limits in golang.
To implement a plugin for monstache you need to implement specific function signatures, use the go command to build a .so file for your plugin, and finally pass the path to your plugin .so file when running monstache.
See this wiki page for an example using Docker.
Warning
Golang plugins must be built with the exact same source code (including dependencies) of the loading program.
If you don't build your plugin this way then monstache may fail to load it at runtime due to source code mismatches.
To create a golang plugin for monstache
- git clone
monstache
somewhere outside your $GOPATH - git checkout a specific monstache version tag (e.g.
v6.7.4
). See Versions above. - in the monstache root directory run
go install
to build themonstache
binary. It should now be in $GOPATH/bin - create a .go source file for your plugin in the monstache root directory with the package name
main
- implement one or more of the following functions:
Map
,Filter
,Pipeline
,Process
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error)
func Filter(input *monstachemap.MapperPluginInput) (keep bool, err error)
func Pipeline(ns string, changeStream bool) (stages []interface, err error)
func Process(input*monstachemap.ProcessPluginInput) error
Compile your plugin to a .so with
go build -buildmode=plugin -o myplugin.so myplugin.go
Run the binary, the one you built above with go install
(not a release binary), with the following arguments
$GOPATH/bin/monstache -mapper-plugin-path /path/to/myplugin.so
The following example plugin simply converts top-level string values to uppercase
package main
import (
"github.com/rwynn/monstache/monstachemap"
"strings"
)
// a plugin to convert document values to uppercase
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
doc := input.Document
for k, v := range doc {
switch v.(type) {
case string:
doc[k] = strings.ToUpper(v.(string))
}
}
output = &monstachemap.MapperPluginOutput{Document: doc}
return
}
The input parameter will contain information about the document's origin database and collection:
field | meaning |
---|---|
Document |
MongoDB document updated or inserted |
UpdateDescription |
If available, the update description |
Namespace |
Operation namespace as described above |
Database |
MongoDB database from where the event came |
Collection |
MongoDB collection where the document was inserted, deleted or updated |
Operation |
Which kind of operation triggered this event, see gtm.mapOperation(). "i" for insert, "u" for update, "d" for delete and "c" for invalidate. The Map function will only receive inserts and updates. To handle deletes or invalidates implement the Process function described below. |
Session |
*mgo.Session. You need not Close the session as monstache will do this automatically when the function exits |
The output parameter will contain information about how the document should be treated by monstache:
field | meaning |
---|---|
Document |
an updated document to index into Elasticsearch |
Index |
the name of the index to use |
Type |
the document type |
ID |
override the document ID |
Routing |
the routing value to use |
Drop |
set to true to indicate that the document should not be indexed but removed |
Passthrough |
set to true to indicate the original document should be indexed unchanged |
Parent |
the parent id to use |
Version |
the version of the document |
VersionType |
the version type of the document (internal, external, external_gte) |
Pipeline |
the pipeline to index with |
RetryOnConflict |
how many times to retry updates before failing |
Skip |
set to true to indicate the the document should be ignored |
For detailed information see monstachemap/plugin.go
Few examples are:
To skip the document (direct monstache to ignore it) set output.Skip = true
.
To drop the document (direct monstache not to index it but remove it) set output.Drop = true
.
To simply pass the original document through to Elasticsearch, set output.Passthrough = true
To set custom indexing metadata on the document use output.Index
, output.Type
, output.Parent
and output.Routing
.
Note
If you override output.Index
, output.Type
, output.Parent
or output.Routing
for any MongoDB namespaces in a
golang plugin you should also add those namespaces to the routing-namespaces
array in your config file.
This instructs Monstache to query the document metadata so that deletes of the document work correctly.
If would like to embed other MongoDB documents (possibly from a different collection) within the current document
before indexing, you can access the *mgo.Session
pointer as input.Session
. With the mgo session you can use the mgo API to find documents in MongoDB and embed them in the Document set on output.
When you implement a Filter
function the function is called immediately after reading inserts and updates from the oplog. You can return false from this function to completely ignore a document. This is different than setting output.Drop
from the mapping function because when you set output.Drop
to true, a delete request is issued to Elasticsearch in case the document had previously been indexed. By contrast, returning false from the Filter
function causes the operation to be completely ignored and there is no corresponding delete request issued to Elasticsearch.
When you implement a Pipeline
function the function will be called to setup an aggregation pipeline for both direct reads and any change streams that you have configured. The aggregation pipeline stages that you return may be different depending if applied to a direct read or to a change stream. For direct reads the root document will be the document in the collection. For change streams the root document will be a change event with a fullDocument
field inside it. Use the boolean parameter changeStream
to alter the stages that you return from this function accordingly.
When you implement a Process
function the function will be called after monstache processes each event. This function has full access to the MongoDB and Elasticsearch clients (including the Elasticsearch bulk processor) in the input and allows you to handle complex event processing scenarios. The input parameter for the Process
function will have all the same fields as the input to a Map
function described above plus the following:
field | meaning |
---|---|
Document |
MongoDB document updated, inserted, or deleted |
ElasticClient |
A full featured Elasticsearch client |
ElasticBulkProcessor |
The same bulk processor monstache uses to index documents. You need only Add requests to the processor and they will be flushed in bulk automatically. Note you must delete the _id field from any argument to the bulk processor Add function |
Timestamp |
The MongoDB timestamp of the change event from the oplog. In the case of direct reads the timestamp is the time at which the document was read from MongoDB. |
Note
Under the docker/plugin
folder there is a build.sh
script to help you build a plugin. There is a README file in that directory with instructions.
Javascript¶
Monstache supports plugins written in Javascript. You may find that Javascript plugins give you much less performance than golang plugins. You also may reach some limits of what can be done in the Javascript. This is due to the implementation of the Javascript environment and the locking required under high load. Javascript plugins are still very useful for quick prototypes and small data sets.
Transformation¶
Monstache uses the amazing otto library to provide transformation at the document field level in Javascript. You can associate one javascript mapping function per MongoDB collection. You can also associate a function at the global level by not specifying a namespace. These javascript functions are added to your TOML config file, for example:
[[script]]
namespace = "mydb.mycollection"
script = """
var counter = 1;
module.exports = function(doc) {
doc.foo += "test" + counter;
counter++;
return doc;
}
"""
[[script]]
namespace = "anotherdb.anothercollection"
path = "path/to/transform.js"
routing = true
[[script]]
# this script does not declare a namespace
# it is global to all collections
script = """
module.exports = function(doc, ns, updateDesc) {
// the doc namespace e.g. test.test is passed as the 2nd arg
// if available, an object containing the update description is passed as the 3rd arg
return _.omit(doc, "password", "secret");
}
"""
The example TOML above configures 3 scripts. The first is applied to mycollection
in mydb
while the second is applied
to anothercollection
in anotherdb
. The first script is inlined while the second is loaded from a file path. The path can be absolute or relative to the directory monstache is executed from.
The last script does not specify a namespace, so documents from all collections pass through it. Global scripts are run before scripts which are
linked to a specific namespace.
You will notice that the multi-line string feature of TOML is used to assign a javascript snippet to the variable named
script
. The javascript assigned to script must assign a function to the exports property of the module
object. This
function will be passed the document from MongoDB just before it is indexed in Elasticsearch. Inside the function you can
manipulate the document to drop fields, add fields, or augment the existing fields.
The this
reference in the mapping function is assigned to the document from MongoDB.
When the return value from the mapping function is an object
then that mapped object is what actually gets indexed in Elasticsearch.
For these purposes an object is a javascript non-primitive, excluding Function
, Array
, String
, Number
, Boolean
, Date
, Error
and RegExp
.
Filtering¶
You can completely ignore documents by adding filter configurations to your TOML config file. The filter functions are executing immediately after inserts or updates are read from the oplog. The correspding document is passed into the function and you can return true or false to include or ignore the document.
[[filter]]
namespace = "db.collection"
script = """
module.exports = function(doc, ns, updateDesc) {
return !!doc.interesting;
}
"""
[[filter]]
namespace = "db2.collection2"
path = "path/to/script.js"
Aggregation Pipelines¶
You can alter or filter direct reads and change streams by using a pipeline definition. Note, when building a pipeline for a change stream the
root of the document will be the change event and the associated document will be under a field named fullDocument
.
For more information on the properties of the root document for change streams see Change Events.
You can scope a pipeline to a particular namespace using the namespace
attribute or leave it off to have the pipeline applied to all namespaces.
[[pipeline]]
script = """
module.exports = function(ns, changeStream) {
if (changeStream) {
return [
{ $match: {"fullDocument.foo": 1} }
];
} else {
return [
{ $match: {"foo": 1} }
];
}
}
"""
Warning
You should not replace the root using $replaceRoot
for a change stream since monstache needs this information. You should only make
modifications to the fullDocument
field in a pipeline.
Dropping¶
If the return value from the mapping function is not an object
per the definition above then the result is converted into a boolean
and if the boolean value is false
then that indicates to monstache that you would not like to index the document. If the boolean value is true
then
the original document from MongoDB gets indexed in Elasticsearch.
This allows you to return false or null if you have implemented soft deletes in MongoDB.
[[script]]
namespace = "db.collection"
script = """
module.exports = function(doc) {
if (!!doc.deletedAt) {
return false;
}
return true;
}
"""
In the above example monstache will index any document except the ones with a deletedAt
property. If the document is first
inserted without a deletedAt
property, but later updated to include the deletedAt
property then monstache will remove, or drop, the
previously indexed document from the Elasticsearch index.
Note
Dropping a document is different that filtering a document. A filtered document is completely ignored. A dropped document results in a delete request being issued to Elasticsearch in case the document had previously been indexed.
Scripting Features¶
You may have noticed that in the first example above the exported mapping function closes over a var named counter
. You can
use closures to maintain state between invocations of your mapping function.
Finally, since Otto makes it so easy, the venerable Underscore library is included for you at
no extra charge. Feel free to abuse the power of the _
.
Embedding Documents¶
In your javascript function you have access to the following global functions to retreive documents from MongoDB for embedding in the current document before indexing. Using this approach you can pull in related data.
function findId(documentId, [options]) {
// convenience method for findOne({_id: documentId})
// returns 1 document or null
}
function findOne(query, [options]) {
// returns 1 document or null
}
function find(query, [options]) {
// returns an array of documents or null
}
function pipe(stages, [options]) {
// returns an array of documents or null
}
Each function takes a query
type object parameter and an optional options
object parameter.
The options object takes the following keys and values:
var options = {
database: "test",
collection: "test",
// to omit _id set the _id key to 0 in select
select: {
age: 1
},
// only applicable to find...
sort: ["name"],
limit: 2
}
If the database or collection keys are omitted from the options object, the values for database and/or collection are set to the database and collection of the document being processed.
Here are some examples:
This example sorts the documents in the same collection as the document being processed by name and returns the first 2 documents projecting only the age field. The result is set on the current document before being indexed.
[[script]]
namespace = "test.test"
script = """
module.exports = function(doc) {
doc.twoAgesSortedByName = find({}, {
sort: ["name"],
limit: 2,
select: {
age: 1
}
});
return doc;
}
"""
This example grabs a reference id from a document and replaces it with the corresponding document with that id.
[[script]]
namespace = "test.posts"
script = """
module.exports = function(post) {
if (post.author) { // author is a an object id reference
post.author = findId(post.author, {
database: "test",
collection: "users"
});
}
return post;
}
"""
This example runs an aggregation pipeline and stores the results in an extra field in the document
[[script]]
namespace = "test.test"
script = """
module.exports = function(doc, ns) {
doc.extra = pipe([
{ $match: {foo: 1} },
{ $limit: 1 },
{ $project: { _id: 0, foo: 1}}
]
// optional , { database: "foo", collection: "bar"} // defaults to same namespace
);
return doc;
}
"""
Indexing Metadata¶
You can override the indexing metadata for an individual document by setting a special field named
_meta_monstache
on the document you return from your Javascript function.
The _meta_monstache
object supports the following properties.
prop | meaning |
---|---|
routing |
the routing value |
index |
the name of the index to use |
type |
the document type |
parent |
the document parent |
version |
the document version |
versionType |
the document version type |
pipeline |
the name of a pipeline to apply to the document |
retryOnConflict |
control how retry works on conflicts |
skip |
set this boolean to true to skip indexing |
id |
override the ID used to index the document |
Assume there is a collection in MongoDB named company
in the test
database.
The documents in this collection look like either
{ "_id": "london", "type": "branch", "name": "London Westminster", "city": "London", "country": "UK" }
or
{ "_id": "alice", "type": "employee", "name": "Alice Smith", "branch": "london" }
Given the above the following snippet sets up a parent-child relationship in Elasticsearch based on the incoming documents from MongoDB and updates the ns (namespace) from test.company to company in Elasticsearch
[[script]]
namespace = "test.company"
routing = true
script = """
module.exports = function(doc, ns) {
// var meta = { type: doc.type, index: 'company' };
var meta = { type: doc.type, index: ns.split(".")[1] };
if (doc.type === "employee") {
meta.parent = doc.branch;
}
doc._meta_monstache = meta;
return _.omit(doc, "branch", "type");
}
"""
The snippet above will route these documents to the company
index in Elasticsearch instead of the
default of test.company
, if you didn't specify a namespace, it'll route all documents to indexes named as the collection only without the database db.collection (MongoDB) => collection (Elasticsearch). Also, instead of using company
as the Elasticsearch type, the type
attribute from the document will be used as the Elasticsearch type. Finally, if the type is
employee then the document will be indexed as a child of the branch the person belongs to.
We can throw away the type and branch information by deleting it from the document before returning
since the type information will be stored in Elasticsearch under _type
and the branch information
will be stored under _parent
.
The example is based on the Elasticsearch docs for parent-child
For more on updating the namespace name, check the Delete Strategy
Routing¶
Routing is the process by which Elasticsearch determines which shard a document will reside in. Monstache supports user defined, or custom, routing of your MongoDB documents into Elasticsearch.
Consider an example where you have a comments
collection in MongoDB which stores a comment and
its associated post identifier.
use blog;
db.comments.insert({title: "Did you read this?", post_id: "123"});
db.comments.insert({title: "Yeah, it's good", post_id: "123"});
In this case monstache will index those 2 documents in an index named blog.comments
under the id
created by MongoDB. When Elasticsearch routes a document to a shard, by default, it does so by hashing
the id of the document. This means that as the number of comments on post 123
grows, each of the comments
will be distributed somewhat evenly between the available shards in the cluster.
Thus, when a query is performed searching among the comments for post 123
Elasticsearch will need to query
all of those shards just in case a comment happened to have been routed there.
We can take advantage of the support in Elasticsearch and in monstache to do some intelligent
routing such that all comments for post 123
reside in the same shard.
First we need to tell monstache that we would like to do custom routing for this collection by setting routing
equal to true on a custom script for the namespace. Then we need to add some metadata to the document telling
monstache how to route the document when indexing. In this case we want to route by the post_id
field.
[[script]]
namespace = "blog.comments"
routing = true
script = """
module.exports = function(doc) {
doc._meta_monstache = { routing: doc.post_id };
return doc;
}
"""
Now when monstache indexes document for the collection blog.comments
it will set the special _routing
attribute
for the document on the index request such that Elasticsearch routes comments based on their corresponding post.
The _meta_monstache
field is used only to inform monstache about routing and is not included in the source
document when indexing to Elasticsearch.
Now when we are searching for comments and we know the post id that the comment belongs to we can include that post id in the request and make a search that normally queries all shards query only 1 shard.
$ curl -H "Content-Type:application/json" -XGET 'http://localhost:9200/blog.comments/_search?routing=123' -d '
{
"query":{
"match_all":{}
}
}'
You will notice in the response that only 1 shard was queried instead of all your shards. Custom routing is great way to reduce broadcast searches and thus get better performance.
The catch with custom routing is that you need to include the routing parameter on all insert, update, and delete operations. Insert and update is not a problem for monstache because the routing information will come from your MongoDB document. Deletes, however, pose a problem for monstache because when a delete occurs in MongoDB the information in the oplog is limited to the id of the document that was deleted. But monstache needs to know where the document was originally routed in order to tell Elasticsearch where to look for it.
Monstache has 3 available strategies for handling deletes in this situation. The default strategy is stateless and uses
a term query into Elasticsearch based on the ID of the document deleted in MongoDB. If the search into Elasticsearch returns
exactly 1 document then monstache will schedule that document for deletion. The 2nd stategy monstache uses is stateful and requires
giving monstache the ability to write to the collection monstache.meta
. In this collection monstache stores information about
documents that were given custom indexing metadata. This stategy slows down indexing and takes up space in MongoDB.
However, it is precise because it records exactly how each document was indexed. The final stategy simply punts on deletes and
leaves document deletion to the user. If you don't generally delete documents in MongoDB or don't care if Elasticsearch contains
documents which have been deleted in MongoDB, this option is available. See Delete Strategy for more information.
For more information see Customizing Document Routing
In addition to letting your customize the shard routing for a specific document, you can also customize the Elasticsearch
index
and type
using a script by putting the custom information in the meta attribute.
[[script]]
namespace = "blog.comments"
routing = true
script = """
module.exports = function(doc) {
if (doc.score >= 100) {
// NOTE: prefix dynamic index with namespace for proper cleanup on drops
doc._meta_monstache = { index: "blog.comments.highscore", type: "highScoreComment", routing: doc.post_id };
} else {
doc._meta_monstache = { routing: doc.post_id };
}
return doc;
}
"""
Joins¶
Elasticsearch 6 introduces an updated approach to parent-child called joins. The following example shows how you can accomplish joins with Monstache. The example is based on the Elasticsearch documentation.
This example assumes Monstache is syncing the test.test
collection in MongoDB with the test.test
index in Elasticsearch.
First we will want to setup an index mapping in Elasticsearch describing the join field.
curl -XPUT 'localhost:9200/test.test?pretty' -H 'Content-Type: application/json' -d'
{
"mappings": {
"_doc": {
"properties": {
"my_join_field": {
"type": "join",
"relations": {
"question": "answer"
}
}
}
}
}
}
'
Warning
The above mapping uses _doc as the Elasticsearch type. _doc is the recommended type for new versions Elasticsearch but it only works with Elasticsearch versions 6.2 and greater. Monstache defaults to using _doc as the type when it detects Elasticsearch version 6.2 or greater. If you are using a previous version of Elasticsearch monstache defaults to using the MongoDB collection name as the Elasticsearch type. The type Monstache uses can be overriden but it is not recommended from Elasticsearch 6.2 on.
Next will will configure Monstache with custom Javascript middleware that does transformation and routing. In a file called CONFIG.toml.
[[script]]
namespace = "test.test"
routing = true
script = """
module.exports = function(doc) {
var routing;
if (doc.type === "question") {
routing = doc._id;
doc.my_join_field = {
name: "question"
}
} else if (doc.type === "answer") {
routing = doc.question;
doc.my_join_field = {
name: "answer",
parent: routing
};
}
if (routing) {
doc._meta_monstache = { routing: routing };
}
return doc;
}
"""
The mapping function adds a my_join_field
field to each document. The contents of the field are based on the type
attribute in the MongoDB
document. Also, the function ensures that the routing is always based on the _id of the question document.
Now with this config in place we can start Monstache. We will use verbose to see the requests.
monstache -verbose -f CONFIG.toml
With Monstache running we are now ready to insert into MongoDB
rs:PRIMARY> use test;
switched to db test
rs:PRIMARY> db.test.insert({type: "question", text: "This is a question"});
rs:PRIMARY> db.test.find()
{ "_id" : ObjectId("5a84a8b826993bde57c12893"), "type" : "question", "text" : "This is a question" }
rs:PRIMARY> db.test.insert({type: "answer", text: "This is an answer", question: ObjectId("5a84a8b826993bde57c12893") });
When we insert these documents we should see Monstache generate the following requests to Elasticsearch
{"index":{"_id":"5a84a8b826993bde57c12893","_index":"test.test","_type":"_doc","routing":"5a84a8b826993bde57c12893","version":6522523668566769665,"version_type":"external"}}
{"my_join_field":{"name":"question"},"text":"This is a question","type":"question"}
{"index":{"_id":"5a84a92b26993bde57c12894","_index":"test.test","_type":"_doc","routing":"5a84a8b826993bde57c12893","version":6522524162488008705,"version_type":"external"}}
{"my_join_field":{"name":"answer","parent":"5a84a8b826993bde57c12893"},"question":"5a84a8b826993bde57c12893","text":"This is an answer","type":"answer"}
This looks good. We should now have a parent/child relationship between these documents in Elasticsearch.
If we do a search on the test.test
index we see the following results:
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "test.test",
"_type" : "_doc",
"_id" : "5a84a8b826993bde57c12893",
"_score" : 1.0,
"_routing" : "5a84a8b826993bde57c12893",
"_source" : {
"my_join_field" : {
"name" : "question"
},
"text" : "This is a question",
"type" : "question"
}
},
{
"_index" : "test.test",
"_type" : "_doc",
"_id" : "5a84a92b26993bde57c12894",
"_score" : 1.0,
"_routing" : "5a84a8b826993bde57c12893",
"_source" : {
"my_join_field" : {
"name" : "answer",
"parent" : "5a84a8b826993bde57c12893"
},
"question" : "5a84a8b826993bde57c12893",
"text" : "This is an answer",
"type" : "answer"
}
}
]
}
To clean up our documents in Elasticsearch a bit we can omit the information that we don't really need in the source docs by
updating our mapping function. This information needs not be at the top-level since it is duplicated in my_join_field
.
return _.omit(doc, "type", "question");
If your parent and child documents are in separate MongoDB collections then you would set up a script for each collection. You can tell if the doc is a parent or child by the collection it comes from. The only other difference would be that you would need to override the index dynamically in addition to the routing such that documents from both MongoDB collections target the same index.
doc._meta_monstache = { routing: routing, index: "parentsAndChildren" };
Warning
You must be careful when you route 2 or more MongoDB collections to the same Elasticsearch index that the document _ids across the MongoDB collections do not collide for any 2 docs because they will be used as the _id in the target index.
Time Machines¶
If you are not just interested in what the current value of a document in MongoDB is, but also would like to see how it has changed over time use time machine namespaces. For example, you've inserted and later updated a document with id 123 in the test.test
collection in MongoDB. If test.test
is a time machine namespace you will have 2 documents representing those changes in the log.test.test.2018-02-20
index (timestamp will change) in Elasticsearch. If you later want all the changes made to that document in MongoDB you can issue a query like this:
$ curl -XGET 'http://localhost:9200/log.test.test.*/_search?routing=123' -d '
{
"query":{
"sort" : [
{ "_oplog_ts" : {"order" : "desc"}}
],
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"term":{
"_source_id":"123"
}
}
}
}
}'
That query will be very efficient because it only queries the shard that all the change docs went to for MongoDB document id 123. It filters the documents on that shard by _source_id
, or id from MongoDB, to only give us the changes to that document. Finally, it sorts by the _oplog_ts
which gives us the most recent change docs first.
The index pattern in the query is a wildcard to pick up all the timestamped indexes that we've acculated for the test.test
namespace.
Merge Patches¶
A unique feature of monstache is support for JSON Merge Patches rfc-7396.
If merge patches are enabled monstache will add an additional field to documents indexed into Elasticsearch. The
name of this field is configurable but it defaults to json-merge-patches
.
Consider the following example with merge patches enabled...
db.test.insert({name: "Joe", age: 16, friends: [1, 2, 3]})
At this point you would have the following document source in Elasticsearch.
"_source" : {
"age" : 16,
"friends" : [
1,
2,
3
],
"json-merge-patches" : [
{
"p" : "{\"age\":16,\"friends\":[1,2,3],\"name\":\"Joe\"}",
"ts" : 1487263414,
"v" : 1
}
],
"name" : "Joe"
}
As you can see we have a single timestamped merge patch in the json-merge-patches array.
Now let's update the document to remove a friend and update the age.
db.test.update({name: "Joe"}, {$set: {age: 21, friends: [1, 3]}})
If we now look at the document in Elasticsearch we see the following:
"_source" : {
"age" : 21,
"friends" : [
1,
3
],
"json-merge-patches" : [
{
"p" : "{\"age\":16,\"friends\":[1,2,3],\"name\":\"Joe\"}",
"ts" : 1487263414,
"v" : 1
},
{
"p" : "{\"age\":21,\"friends\":[1,3]}",
"ts" : 1487263746,
"v" : 2
}
],
"name" : "Joe"
}
You can see that the document was updated as expected and an additional merge patch was added.
Each time the document is updated in MongoDB the corresponding document in Elasticsearch gains a timestamped merge patch. Using this information we can time travel is the document's history.
There is a merge patch for each version of the document. To recreate a specific version we simply need to apply the merge patches in order up to the version that we want.
To get version 1 of the document above we start with {} and apply the 1st merge patch.
To get version 2 of the document above we start with {}
- apply the 1st merge patch to get v1
- apply the 2nd merge patch to v1 to get v2
The timestamps associated with these merge patches are in seconds since the epoch, taken from the timestamp recorded in the oplog when the insert or update occured.
To enable the merge patches feature in monstache you need to add the following to you TOML config:
enable-patches = true
patch-namespaces = ["test.test"]
You need you add each namespace that you would like to see patches for in the patch-namespaces array.
Optionally, you can change the key under which the patches are stored in the source document as follows:
merge-patch-attribute = "custom-merge-attr"
Merge patches will only be recorded for data read from the MongoDB oplog. Data read using the direct read feature will not be enhanced with merge patches.
Most likely, you will want to turn off indexing for the merge patch attribute. You can do this by creating an index template for each patch namespace before running monstache...
PUT /_template/test.test
{
"template" : "test.test",
"mappings" : {
"test" : {
"json-merge-patches" : { "index" : false }
}
}
}
Systemd¶
Monstache has support built in for integrating with systemd. The following monstache.service
is an example
systemd configuration.
[Unit]
Description=monstache sync service
[Service]
Type=notify
ExecStart=/usr/local/bin/monstache -f /etc/monstache/config.toml
WatchdogSec=30s
Restart=always
[Install]
WantedBy=multi-user.target
Systemd unit files are normally saved to /lib/systemd/system
. Verify same with your OS documentation.
After saving the monstache.service file you can run systemctl daemon-reload
to tell systemd to reload
all unit files.
You can enable the service to start on boot with systemctl enable monstache.service
and start the service with systemctl start monstache.service
.
With the configuration above monstache will notify systemd when it has started successfully and then notify systemd repeatedly at half the WatchDog interval to signal liveness. The configuration above causes systemd to restart monstache if it does not start or respond within the WatchdDog interval.
Docker¶
There are Docker images available for Monstache on Docker Hub
You can pull and run the latest images with
docker run rwynn/monstache:rel6 -v
docker run rwynn/monstache:rel5 -v
You can pull and run release images with
docker run rwynn/monstache:6.7.4 -v
docker run rwynn/monstache:5.7.4 -v
For example, to run monstache via Docker with a golang plugin that resides at ~/plugin/plugin.so
on the host you can use a bind mount
docker run --rm --net=host -v ~/plugin:/tmp/plugin rwynn/monstache:6.7.4 -mapper-plugin-path /tmp/plugin/plugin.so
HTTP Server¶
Monstache has a built in HTTP server that you can enable with --enable-http-server. It listens on :8080 by default but you can change this with --http-server-addr.
When using monstache with kubernetes this server can be used to detect liveness and act accordingly
The following GET endpoints are available
/started¶
Returns the uptime of the server
/healthz¶
Returns at 200 status code with the text "ok" when monstache is running
/stats¶
Returns the current indexing statistics in JSON format. Only available if stats are enabled. Stats are populated by the Olivere Elasticsearch Go client
Flushed | The number of times the flush interval has been invoked |
---|---|
Committed | The number of times workers committed bulk requests |
Indexed | The number of requests indexed |
Created | The number of requests that ES reported as creates (201) |
Updated | The number of requests that ES reported as updates |
Deleted | The number of requests that ES reported as deletes |
Succeeded | The number of requests that ES reported as successful |
Failed | The number of requests that ES reported as failed |
Workers[i].Queued | The number of ES requests queued in this worker |
Workers[i].LastDuration | The duration of last commit in Nanoseconds |
/instance¶
Returns information about the running monstache process including whether or not it is currently enabled (a cluster will have one enabled process) and the most recent change event timestamp read from MongoDB.
enabled | true if this instance is enabled |
---|---|
pid | The process id of the monstache process |
hostname | The hostname |
cluster | The cluster-name, if one is set |
resumeName | The resume-name, if one is set |
lastTs | Last Timestamp processed from the oplog. BSON Timestamp object.
|
lastTsFormat | Formatted version of last timestamp processed from the oplog |
/debug (if pprof is enabled)¶
If the pprof setting is enabled the following endpoints are also made available:
- /debug/pprof/
- /debug/pprof/cmdline
- /debug/pprof/profile
- /debug/pprof/symbol
- /debug/pprof/trace
MongoDB Authentication¶
Check the following link for all available options that you can specify in the MongoDB connection string related to authentication.
For more information on how the MongoDB driver processes authentication configuration see the driver docs.
AWS Signature Version 4¶
Monstache has included AWS Signature Version 4 request signing. To enable the AWS Signature Version 4 support add the following to your config file:
elasticsearch-urls = ["https://<endpoint_from_aws_overview_screen>:443"]
[aws-connect]
access-key = "XXX"
secret-key = "YYY"
region = "ZZZ"
See the docs for aws-connect for the different stategies available for configuring a credential provider.
Notice how the elasticsearch-url
references the port number 443
in the connection string. This is because AWS makes
your cluster available on the standard https port and not the default Elasticsearch port of 9200
. If you have connection
problems make sure you are using the correct port. You cannot omit the port because the driver will default to 9200
if a
port is not specified.
You can read more about Signature Version 4 and Amazon Elasticsearch Service.
For information on how to obtain the access-key
and secret-key
required to connect you can read this
blog post.
In short, you will need to create or use an existing IAM user in your AWS account. You will then need to give this user
access to your Elasticsearch domain. The access-key
and secret-key
you put in your configuration file are those associated
with the IAM user.
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<account-id>:user/<iam-user-name>"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:<account-id>:domain/<elasticsearch-domain-name>/*"
}
Watching changes on specific fields only¶
If you are using MongoDB 3.6+ you can use a change stream pipeline to only listen for change events on specific fields.
For example, if you wanted to listen for create
, delete
, and update
events on the namespace test.test
, but you only
wanted to sync changes when the foo
or bar
field changed on the doc, you could use the following configuration.
If, for example, a field named count
changed on the document, then this change would be ignored by monstache.
change-stream-namespaces = ["test.test"]
[[pipeline]]
namespace = "test.test"
script = """
module.exports = function(ns, changeStream) {
if (changeStream) {
return [
{
$match: {
$or: [
{ "updateDescription": {$exists: false} },
{ "updateDescription.updatedFields.foo": {$exists: true}},
{ "updateDescription.updatedFields.bar": {$exists: true}}
]
}
}
];
} else {
return [];
}
}
"""
To build complicated change stream pipelines see Change Events for information on the structure of change events. This information will shape your pipeline.
MongoDB view replication¶
You may have a situation where you want to replicate a MongoDB view in Elasticsearch.
Or you have a collection that should trigger sync of another collection. You can use the relate
config to do this.
Consider you have a collections thing
and state
. A thing has an associated state and a thing is linked to a state via
a field s
which points to the _id
of the associated state in the state
collection.
You can create a view in MongoDB that uses a $lookup
to pull the state information in and present a view of things with the state
information included.
use thingdb;
db.createView("thingview", "thing", [ {$lookup: {from: "state", localField: "s", foreignField: "_id", as: "s"}}])
Given this view you can use the following config to keep things up to date in a things
index in Elasticsearch.
direct-read-namespaces = ["thingdb.thingview"] # read direct from the view of the collection to seed index
change-stream-namespaces = ["thingdb.thing", "thingdb.state"] # change events happen on the underlying collections not views
[[mapping]]
namespace = "thingdb.thing" # map change events on the thing collection to the things index
index = "things"
[[mapping]]
namespace = "thingdb.thingview" # map direct reads of the thingview to the same things index
index = "things"
[[relate]]
namespace = "thingdb.thing" # when a thing changes look it up in the assoicated view by _id and index that
with-namespace = "thingdb.thingview"
keep-src = false # ignore the original thing that changed and instead just use the lookup of that thing via the view
[[relate]]
namespace = "thingdb.state" # when a state changes trigger a thing change event since thing is associated to a state
with-namespace = "thingdb.thing"
src-field = "_id" # use the _id field of the state that changed to lookup associated things
match-field = "s" # only trigger change events for the things where thing.s (match-field) = state._id (src-field).
keep-src = false
Warning
Be careful of the expense of using relate
with a view. In the example above, if there were many things
associated to a single state then a change to that state would trigger n+1 queries to MongoDB when n is the
number of things related to the state. 1 query would be used to find all associated things and n queries would be
used to lookup each thing in the view.
Amazon DocumentDB (with MongoDB compatibility)¶
Monstache support for Amazon DocumentDB is currently experimental. Support for the change streams API in MongoDB was recently added to Amazon DocumentDB. Consult the DocumentDB documentation for instructions on enabling change streams for your collections.
Since Amazon DocumentDB only supports compatibility with MongoDB API 3.6 you will want to ensure that your change stream configuration targets collections and that your resume strategy is set to use tokens and not the default of timestamps.
Ensure that your MongoDB connection URI is set with a primary read preference: e.g. ?readPreference=primary
.
# ensure you target collections in your change stream namespaces
change-stream-namespaces = ["db1.col1", "db2.col2"]
# ensure that resuming, if enabled, is done based on tokens and not timestamps
resume = true
resume-strategy = 1