Events

Esse ships with a built-in pub/sub instrumentation layer. Every operation against Elasticsearch/OpenSearch emits a named event with a payload. Subscribe to events to instrument, log, or react.

Subscribing

Two forms are supported:

Block subscription

Esse::Events.subscribe('elasticsearch.bulk') do |event|
puts "Bulk request: #{event.payload[:runtime]}s"
end

Listener subscription

An object with on_event_name methods (dots replaced with underscores) gets auto-subscribed to the matching events:

class LoggerListener
def on_elasticsearch_bulk(event)
puts "bulk: #{event.payload[:body_size]}b"
end
def on_elasticsearch_search(event)
puts "search: #{event.payload[:runtime]}s"
end
end
listener = LoggerListener.new
Esse::Events.subscribe(listener)
Esse::Events.subscribed?(listener) # => true
Esse::Events.unsubscribe(listener)

Available events

All events live under the elasticsearch.* namespace.

Documents

EventWhen
elasticsearch.indexSingle-document index
elasticsearch.updateSingle-document update
elasticsearch.deleteSingle-document delete
elasticsearch.getSingle-document fetch
elasticsearch.mgetMulti-get
elasticsearch.existExists check
elasticsearch.countCount query
elasticsearch.bulk_bulk API call

Indices

EventWhen
elasticsearch.create_indexindices.create
elasticsearch.delete_indexindices.delete
elasticsearch.index_existindices.exists
elasticsearch.closeindices.close
elasticsearch.openindices.open
elasticsearch.refreshindices.refresh
elasticsearch.update_settingsindices.put_settings
elasticsearch.update_mappingindices.put_mapping
elasticsearch.update_aliasesindices.update_aliases
EventWhen
elasticsearch.search_search API
elasticsearch.execute_search_queryInternal search execution
elasticsearch.reindex_reindex API
elasticsearch.update_by_query_update_by_query
elasticsearch.delete_by_query_delete_by_query

Tasks

EventWhen
elasticsearch.tasksList tasks
elasticsearch.taskSingle-task query
elasticsearch.cancel_taskCancel task

Event payload

Every event is an object with a payload hash. Common keys:

KeyDescription
:requestRequest parameters sent to ES/OS
:responseResponse hash
:runtimeDuration in seconds
:errorException if the call failed
:__started_at__Internal start time (Time instance)

Operation-specific payloads may include :body_size, :document_count, :index, :type, etc.

Publishing

Esse itself publishes events via Esse::Events.instrument:

Esse::Events.instrument('elasticsearch.bulk') do |payload|
payload[:body_size] = body.bytesize
response = client.bulk(body: body)
payload[:response] = response
response
end

instrument records runtime automatically and ensures the event fires even if the block raises.

You can publish custom events too:

Esse::Events.publish('myapp.reindex_batch', batch_id: 42, count: 1_000)

Patterns

Log every ES call

Esse::Events.event_names.grep(/^elasticsearch/).each do |event_name|
Esse::Events.subscribe(event_name) do |event|
Rails.logger.debug "#{event_name}: #{event.payload[:runtime]}s"
end
end

Track search latency in Rails

The esse-rails gem already subscribes to every elasticsearch.* event and surfaces the accumulated runtime in your controller logs:

Completed 200 OK in 125.3ms (Views: 45.2ms | Search: 78.1ms)

Fail-fast on bulk errors

Esse::Events.subscribe('elasticsearch.bulk') do |event|
next unless event.payload[:response]
errors = event.payload[:response].dig(:items) || []
failed = errors.select { |i| i.values.first[:error] }
Sentry.capture_message("Bulk had failed items", extra: { failed: failed }) if failed.any?
end

Subscription lifecycle

Subscriptions are stored in-memory for the current process. In a forking server (Puma, Sidekiq), re-subscribe in each worker fork to avoid missing events.

Esse::Events.subscribers # all subscribers
Esse::Events.unsubscribe(subscriber)
Esse::Events.subscribed?(subscriber)