eventutilities/src/main/java/org/wikimedia/eventutilities/monitoring/CanaryEventProducer.java - wikimedia-event-utilities - Gitiles
gerrit.wikimedia.org
wikimedia-event-utilities
refs/heads/master
eventutilities
src
main
java
org
wikimedia
eventutilities
monitoring
CanaryEventProducer.java
blob: 4bd53ba73320931eaa3ef776ae31d1a5dae0f792 [
file
] [
log
] [
blame
] [
edit
package
org
wikimedia
eventutilities
monitoring
import
static
com
google
common
base
Preconditions
checkArgument
import
static
com
google
common
collect
ImmutableList
toImmutableList
import
static
com
google
common
collect
ImmutableMap
toImmutableMap
import
java
net
URI
import
java
util
HashMap
import
java
util
List
import
java
util
Map
import
java
util
stream
Collectors
import
javax
annotation
Nonnull
import
org
joda
time
DateTime
import
org
wikimedia
eventutilities
core
event
EventSchemaLoader
import
org
wikimedia
eventutilities
core
event
EventStream
import
org
wikimedia
eventutilities
core
event
EventStreamConfig
import
org
wikimedia
eventutilities
core
event
EventStreamFactory
import
org
wikimedia
eventutilities
core
http
BasicHttpClient
import
org
wikimedia
eventutilities
core
http
BasicHttpResult
import
com
fasterxml
jackson
databind
ObjectMapper
import
com
fasterxml
jackson
databind
node
ArrayNode
import
com
fasterxml
jackson
databind
node
JsonNodeFactory
import
com
fasterxml
jackson
databind
node
ObjectNode
import
com
google
common
collect
ImmutableList
import
edu
umd
cs
findbugs
annotations
SuppressFBWarnings
/**
* Uses an EventStreamFactory to create and POST Wikimedia canary events to
* Wikimedia event services. Canary events are constructed from the first entry in the
* event schema's examples field. It is expected that event schemas have the fields listed
* at https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Required_fields, and
* also that the receiving event intake service (e.g. EventGate) will set meta.dt if it is
* not present in the event.
*/
public
class
CanaryEventProducer
/**
* EventStreamFactory instance used when constructing EventStreams.
*/
protected
final
EventStreamFactory
eventStreamFactory
/**
* List of data center names that will be used to look up event service urls.
*/
protected
static
final
List
String
DATACENTERS
ImmutableList
of
"eqiad"
"codfw"
);
/**
* Used for serializing JsonNode events to Strings.
*/
protected
static
final
ObjectMapper
OBJECT_MAPPER
new
ObjectMapper
();
/**
* Will be used as the value of meta.domain when building canary events.
* It does not matter what this is, but it should be consistent.
*/
protected
static
final
String
CANARY_DOMAIN
"canary"
private
final
BasicHttpClient
httpClient
/**
* Constructs a new instance of CanaryEventProducer with a new instance of EventStreamFactory
* from eventSchemaLoader and eventStreamConfig.
*/
public
CanaryEventProducer
EventSchemaLoader
eventSchemaLoader
EventStreamConfig
eventStreamConfig
BasicHttpClient
httpClient
this
EventStreamFactory
builder
()
setEventSchemaLoader
eventSchemaLoader
setEventStreamConfig
eventStreamConfig
build
(),
httpClient
);
/**
* Constructs a new CanaryEventProducer using the provided EventStreamFactory.
*/
public
CanaryEventProducer
EventStreamFactory
eventStreamFactory
BasicHttpClient
client
this
eventStreamFactory
eventStreamFactory
this
httpClient
client
/**
* Returns the EventStreamFactory this CanaryEventProducer is using.
*/
@Nonnull
public
EventStreamFactory
getEventStreamFactory
()
return
eventStreamFactory
/**
* Given a streamName, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public
ObjectNode
canaryEvent
String
streamName
return
canaryEvent
eventStreamFactory
createEventStream
streamName
));
@Nonnull
public
ObjectNode
canaryEvent
String
streamName
DateTime
timestamp
return
canaryEvent
eventStreamFactory
createEventStream
streamName
),
timestamp
);
/**
* Given an EventStream, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public
ObjectNode
canaryEvent
EventStream
es
return
makeCanaryEvent
es
streamName
(),
es
exampleEvent
()
);
/**
* Given an EventStream, gets its schema and uses the JSONSchema examples to make a canary event.
*/
@Nonnull
public
ObjectNode
canaryEvent
EventStream
es
DateTime
timestamp
return
makeCanaryEvent
es
streamName
(),
es
exampleEvent
(),
timestamp
);
/**
* Creates a canary event from an example event for a stream.
*/
@Nonnull
@SuppressFBWarnings
value
"OCP_OVERLY_CONCRETE_PARAMETER"
justification
"Precise typing is useful for intention"
protected
static
ObjectNode
makeCanaryEvent
String
streamName
ObjectNode
example
DateTime
timestamp
checkArgument
example
!=
null
"Cannot make canary event for %s, example is null."
streamName
);
ObjectNode
canaryEvent
example
deepCopy
();
ObjectNode
canaryMeta
ObjectNode
canaryEvent
get
"meta"
);
canaryMeta
set
"domain"
JsonNodeFactory
instance
textNode
CANARY_DOMAIN
));
canaryMeta
set
"stream"
JsonNodeFactory
instance
textNode
streamName
));
if
timestamp
==
null
// Remove meta.dt so it is set by the Event Service we will POST this event to.
canaryMeta
remove
"dt"
);
else
// Set meta.dt at provided timestamp value.
canaryMeta
set
"dt"
JsonNodeFactory
instance
textNode
timestamp
toString
()));
canaryEvent
set
"meta"
canaryMeta
);
return
canaryEvent
protected
static
ObjectNode
makeCanaryEvent
String
streamName
ObjectNode
example
return
makeCanaryEvent
streamName
example
null
);
/**
* Gets canary events to POST for all streams that EventStreamConfig knows about.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getAllCanaryEventsToPost
()
return
getCanaryEventsToPost
eventStreamFactory
getEventStreamConfig
().
cachedStreamNames
()
);
/**
* Gets canary events to POST for a single stream.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getCanaryEventsToPost
String
streamName
return
getCanaryEventsToPost
ImmutableList
of
streamName
));
/**
* Gets canary events to POST for a single stream.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getCanaryEventsToPost
String
streamName
DateTime
timestamp
return
getCanaryEventsToPost
ImmutableList
of
streamName
),
timestamp
);
/**
* Gets canary events to POST for a List of stream names.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getCanaryEventsToPost
List
String
streamNames
return
getCanaryEventsToPostForStreams
eventStreamFactory
createEventStreams
streamNames
),
null
);
/**
* Gets canary events to POST for a List of stream names.
* Refer to docs for getCanaryEventsToPostForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getCanaryEventsToPost
List
String
streamNames
DateTime
timestamp
return
getCanaryEventsToPostForStreams
eventStreamFactory
createEventStreams
streamNames
),
timestamp
);
/**
* Given a list of streams and a timestamp, this will return a map of
* datacenter specific event service URIs to a list of canary
* events that should be POSTed to that event service.
* These can then be iterated through and posted to each
* event service URI to post expected canary events for each stream.
* If timestamp is null, the meta.dt field of canary events is removed
* to be set by EventGate, otherwise it is set to the provided value.
*/
@Nonnull
public
Map
URI
List
ObjectNode
>>
getCanaryEventsToPostForStreams
List
EventStream
eventStreams
DateTime
timestamp
// Build a map of datacenter specific event service url to EventStreams
Map
URI
List
EventStream
>>
eventStreamsByEventServiceUrl
new
HashMap
<>();
for
String
datacenter
DATACENTERS
eventStreamsByEventServiceUrl
putAll
eventStreams
stream
().
collect
Collectors
groupingBy
eventStream
->
eventStream
eventServiceUri
datacenter
))
);
// Convert the Map of URIs -> EventStreams to URIs -> canary events.
// Each set of canary events can be POSTed to their keyed
// event service url.
return
eventStreamsByEventServiceUrl
entrySet
().
stream
()
collect
toImmutableMap
Map
Entry
::
getKey
entry
->
entry
getValue
().
stream
()
map
eventStream
->
canaryEvent
eventStream
timestamp
))
collect
toImmutableList
())
));
/**
* POSTs canary events for all known streams.
* Refer to docs for postCanaryEVents(streamNames).
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
BasicHttpResult
postAllCanaryEvents
()
return
postCanaryEvents
eventStreamFactory
getEventStreamConfig
().
cachedStreamNames
()
);
/**
* Posts canary events for a single streamName.
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
BasicHttpResult
postCanaryEvents
String
streamName
return
postCanaryEvents
ImmutableList
of
streamName
));
/**
* Posts canary events for each named event stream.
* Refer to docs for postCanaryEventsForStreams(eventStreams).
*/
@Nonnull
public
Map
URI
BasicHttpResult
postCanaryEvents
List
String
streamNames
return
postCanaryEventsForStreams
eventStreamFactory
createEventStreams
streamNames
));
/**
* Gets canary events for each eventStream, POSTs them to the appropriate
* event service url(s), and collects the results of each POST
* into a Map of event service url to result ObjectNode.
* We want to attempt every POST we are supposed to do without bailing
* when an error is encountered. This is why the results are collected in
* this way The results should be examined after this method returns
* to check for any failures.
*/
@Nonnull
public
Map
URI
BasicHttpResult
postCanaryEventsForStreams
List
EventStream
eventStreams
return
postEventsToUris
getCanaryEventsToPostForStreams
eventStreams
null
));
/**
* Given a List of ObjectNodes, returns an ArrayNode of those ObjectNodes.
*/
@Nonnull
public
static
ArrayNode
eventsToArrayNode
List
ObjectNode
events
ArrayNode
eventsArray
JsonNodeFactory
instance
arrayNode
();
for
ObjectNode
event
events
eventsArray
add
event
);
return
eventsArray
/**
* Iterates over the Map of URI to events and posts events to the URI.
*/
@Nonnull
public
Map
URI
BasicHttpResult
postEventsToUris
Map
URI
List
ObjectNode
>>
uriToEvents
return
uriToEvents
entrySet
().
stream
()
collect
toImmutableMap
Map
Entry
::
getKey
entry
->
postEvents
entry
getKey
(),
entry
getValue
())
));
/**
* POSTs the given list of
* events to the eventServiceUri.
* Expects that eventServiceUri returns a JSON response.
* EventGate returns 201 if guaranteed success, 202 if hasty success,
* and 207 if partial success (some events were accepted, others were not).
* We want to only consider 201 and 202 as full success so we pass
* httpPostJson a custom isSuccess function to determine this.
* https://github.com/wikimedia/eventgate/blob/master/spec.yaml#L72
* The returned BasicHttpResult will look like:
* success: true,
* status 201,
* message: "HTTP response message",
* body: response body if any
* }
* If ANY events failed POSTing, success will be false, and the reasons
* for the failure will be in message and body.
* If there is a local exception during POSTing, success will be false
* and the Exception message will be in message, and in the exception field
* will have the original Exception.
*/
@Nonnull
public
BasicHttpResult
postEvents
URI eventServiceUri
List
ObjectNode
events
// Convert List of events to ArrayNode of events to allow
// jackson to serialize them as an array of events.
ArrayNode
eventsArray
eventsToArrayNode
events
);
return
httpClient
post
eventServiceUri
OBJECT_MAPPER
eventsArray
// Only consider 201 and 202 from EventGate as fully successful POSTs.
statusCode
->
statusCode
==
201
||
statusCode
==
202
);