Exercise 6 Message Streaming: Handling Emergency Phone Call Streams
Data Event Messages (JSON)
Overall Goal Create a workspace to read, parse, and filter WebSocket messages; keeping only events that affect transit stations.
Demonstrates Creating a workspace to handle a message stream
Start Workspace C:\FMEData2017\Workspaces\ServerAuthoring\DataStream-Ex1-Begin.fmw
End Workspace C:\FMEData2017\Workspaces\ServerAuthoring\DataStream-Ex1-Generate-Complete.fmw
C:\FMEData2017\Workspaces\ServerAuthoring\DataStream-Ex1-Process-Complete.fmw

As a technical analyst in the GIS department you deal with spatial data. Sometimes you need to process that data in real-time and sometimes that data can arrive in great quantities and at great speed.

In one such case, the city has been given access to the monitoring systems of emergency services. That means the ability to access in real-time information about all emergency calls.

By emergency calls we mean the equivalent of 911 calls in North America, 999 in the UK, 112 in most of Europe, and 000 in Australia.

Of course, these calls can arrive at a tremendous rate, and at unknown intervals. If the city wishes to respond to any of these, and even if they wish to just record a history of the calls, you must implement a message streaming setup in FME Server.



1) Open Workspace
Unfortunately (I'm talking from a training point of view) we don't have access to a real-time stream of emergency phone calls, so we will have to generate our own.

Open the workspace C:\FMEData2017\Workspaces\ServerAuthoring\DataStream-Ex1-Begin.fmw


Notice that the workspace generates a stream of events. A random number of events are generated, at random times, and at random locations. Additionally random severity and event type attributes are generated.

Each event is wrapped up into a JSON format message. All that we need to do is push that message out as a stream.


Miss Vector says...
This workspace is just generating "events". Those events could be lightning strikes, vehicle locations, traffic accidents, or even UFO sightings! For this exercise, we'll pretend they are emergency phone calls. In real life you would be connecting to an existing stream of data, and wouldn't need to generate one in this way.


2) Add WebSocketSender Transformer
Add a WebSocketSender transformer after the JSONTemplater. Inspect the parameters and set them as follows:

WebSocket Server URLws://localhost:7078
Verify SSL CertificatesNo
Connection Preamble
{
    ws_op: "open",
    ws_stream_id: "EmergencyEvents"
}
Data To Transmit
{
    ws_op: 'send',
    ws_msg: '@Value(EventMessage)'
}

As you can see, these parameters open a WebSocket connection (to an EmergencyEvents stream) and send information (the EventMessage attribute). Save the parameters and then save the workspace.


3) Create Workspace
Now we have the ability to generate a stream of data we will create the workspace that is to process the data. Start Workbench and begin with a blank canvas (don't close the stream generator workspace, as we'll need that as well in a moment).

In the blank canvas add a Creator transformer and follow it with a WebSocketReceiver. Inspect the WebSocketReceiver transformer parameters and set them as follows:

WebSocket Server URLws://localhost:7078
Verify SSL CertificatesNo
Connection Preamble
{
    ws_op: "open",
    ws_stream_id: "EmergencyEvents"
}
Output AttributeIncomingMessage

Save the changes and add a Logger transformer after the WebSocketReceiver.


4) Publish Workspaces
Let's test what we have by publishing the workspaces and running them on FME Server.

Publish each workspace in turn. In both cases simply register it with the Job Submitter service. There are no datasets or other parameters we need worry about.


5) Run Workspace
Log in to the FME Server web interface, locate the data stream generator workspace, and run it. The dialog in response will look like this:

The workspace will run for a long time and we can leave it to do so. Leave this page by clicking the Run Workspace button on the main menu and - within the Run Workspace page - locate the processing workspace. Now run that.

Again the response will report that the workspace is running, and will continue to do so.


6) Check Jobs and Cancel
Navigate to the Jobs page and click the tab labelled Running. You will see the two jobs:

Let the jobs run for a minute or two. Then choose each of them and click the Cancel button to cancel them:

Once cancelled, go to the Completed jobs tab. You'll see the two cancelled jobs:

Click on the processing workspace job and check the log. You should see messages in the log like this:

|===========================================================================
INFORM|WebSocketReceiver_Output: Feature is:
INFORM|+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
INFORM|Feature Type: `WebSocketReceiver_Output_LOGGED'
INFORM|Attribute(encoded: utf-8): `IncomingMessage' has value `{ "EventID" : 6.....
INFORM|Attribute(string)        : `fme_geometry' has value `fme_undefined'
INFORM|Attribute(encoded: utf-8): `fme_type' has value `fme_no_geom'
INFORM|Geometry Type: Unknown (0)

This proves that the WebSocketReceiver is acting as expected and receiving messages from the message stream.


Miss Vector says...
You've proved that you can create a workspace to process a message stream, which is the important part of this exercise. But if you have the time, let's see what improvements we can add to make the result more realistic.


7) Add JSONFlattener
The first thing to do with incoming messages is to extract information as attributes. Because the incoming data is JSON format, add a JSONFlattener transformer to the processing workspace, after the WebSocketReceiver.

Inspect the JSONFlattener's parameters and set the attribute IncomingMessage as the JSON Document to process.

Under Attributes to Expose manually enter:

  • EventID
  • EventLocation.EventXCoord
  • EventLocation.EventYCoord
  • EventSeverity
  • EventType

You will now have the information from the message available as a set of attributes in the workspace.


8) Add VertexCreator
Now add a VertexCreator transformer. Set it up to use the X/Y attributes to create a true point feature:

With this we now have a true geographic feature and can process it as required.


9) Add Reader
The public transportation team within the city has learned you are working with this emergency data. They wish to be alerted immediately if there is an emergency event within 200 metres of a transit station. Let's show them how easy it is to set this up.

Firstly we need the transit station data, so select Readers > Add Reader and add the following:

Reader Format Esri Geodatabase (File Geodb Open API)
Reader Dataset C:\FMEData2017\Data\CommunityMapping\CommunityMap.gdb

When prompted (or in the parameters dialog) ensure that only the TransitStations table is selected.


10) Filter Data
Now let's filter the emergencies.

First, add a Bufferer transformer to the TransitStation feature type and buffer the features by 200 metres.

Secondly, add a PointOnAreaOverlayer to assess whether an emergency falls inside one of these buffers. The workspace will now look like this:

At the moment there is one big problem that stops this from working. The PointOnAreaOverlayer transformer is a Group-Based transformer, sometimes called a "blocker". It will hold on to features until it has finished being fed them, before outputting any data. In our case we want to make it Feature-Based; i.e. it will process each message at once.

So, inspect the PointOnAreaOverlayer parameters and set Areas First to Yes:

This tells the transformer that all area features (buffered stations) will be first to arrive, therefore any point features (message locations) can be processed immediately.

However, we have to ensure that the transit features will arrive first. Therefore inspect the transformer parameters for the Creator transformer and set Create at End to Yes:

Now, all being well, the transit features will arrive first at the PointOnAreaOverlayer transformer.

Finally, add a Tester transformer after the PointOnAreaOverlayer. Set up the test to check for _overlaps > 0 (i.e. where the message location falls inside a transit station buffer). Connect some Logger transformers to the Tester output ports:

Note that, if there were other parameters (for example the transit team were only interested in Event Types 7, 8, 9, and 10) you could add them to this Tester as well.


11) Publish Workspaces
Now publish the two workspaces again (you may or may not have to upload the TransitStation Geodatabase along with the workspace) and run them using the same process as before, but leave it for a few minutes longer, as it may take a while for one of the random events to fall inside a transit station buffer.

Once stopped, check the logs and you should see that messages falling within 200 metres of a transit station are logged (with a different header).


Miss Vector says...
If you want to adjust the settings to get a result quicker, then go ahead. For example, you might set the buffer size to 500 metres instead of 200, or you might reduce the interval time on the message generator. Feel free to make whatever parameter changes you like to test the setup. You could even bypass the Decelerator transformer (in the data-stream creation workspace) to see how fast FME can deal with the incoming messages! But if you do that, be sure to start the processing workspace first, else the generator might finish by the time you do get the processor started!


12) Add Writer
The messages that are being received are not all being used by the transit team, but we should probably keep a record of them. So - back in FME Workbench - select Writers > Add Writer from the menubar. Use the following parameters to add a database Writer to the processing workspace:

Writer Format SpatiaLite
Writer Dataset C:\FMEData2017\Output\EventMessages.sl3
Writer Parameters Advanced : Features Per Transaction = 1
Add Feature Type(s) Table Definition: Automatic

In the newly added feature type, change the name to events and close the dialog. Connect the feature type to the VertexCreator output port (i.e. we're recording all events, not just the filtered ones):

The attributes are added automatically, but include a few we don't need. So open up the properties dialog again for the feature type and click the User Attributes tab. Change it from Automatic to Manual and delete the attributes:

  • _creation_instance
  • incomingmessage
  • eventlocation_eventxcoord
  • eventlocation_eventycoord

Notice that the attributes were automatically renamed (to lower case and removing disallowed characters) to match SpatiaLite requirements.

If you publish and run the workspace (you may need to set the SpatiaLite database output to be written to a Resources folder) now you should be able to see - while the workspace is still running - the results being added to the database. You can inspect the file in the FME Data Inspector to prove this.


13) Create Notification
One last task (I promise). The filtered messages are important to the transit team, but at the moment they are going nowhere. We should set up a way to inform them.

We could add another messaging transformer, such as the WebSocketSender, JMSSender, SQSSender, or even a Tweeter. That would make the processing workspace a "pure" messaging workspace.

On the other hand, the outgoing messages are nothing like the same rate as the incoming messages. With the parameters as described in this exercise, there is only a transit message once every minute. So, we can create a "hybrid" solution by setting output messages to be sent via the FME Server Notification Service.

Go to the FME Server web interface and navigate to the Notifications page.

Create a new Topic called EmergencyTransitMessages:

Now create a new notification Subscription tied to that topic. There are various protocols we could realistically use for sending a message (email springs to mind) but for the purposes of this exercise use the Logger protocol. Set the Log Level parameter to High:


14) Add FMEServerNotifier Transformer
Back in the processing workspace in Workbench, remove any Logger transformers at the end of the workspace. Add an FMEServerNotifier transformer connected to the Tester:Passed port:

Inspect the transformer parameters and set it up to send a message to the EmergencyTransitMessages topic. Set the message content to be whatever you like. You could use the text editor dialog to create something out of the available attributes (it can be plain text, it doesn't have to be JSON or XML).


15) Publish and Run Workspaces
Re-publish and set the workspaces running again. Navigate to the Notifications page and click on the Topics tab. Enable Topic Monitoring to watch the EmergencyTransitMessages topic for incoming notifications.

In a short while you will start to see emergency messages like this:

Visit Resources > Logs > core > current > subscribers > logger.log to find the results as recorded by the Logger protocol notification.


CONGRATULATIONS
By completing this exercise you have learned how to:
  • Send and receive messages via WebSockets
  • Publish and run message-streaming workspaces
  • Cancel message-streaming workspaces and check their log files
  • Extract attributes from JSON messages
  • Use transformers to transform and filter a message according to its content
  • Set up workspaces to handle group-based transformers in a real-time scenario
  • Record incoming messages into a database
  • Set up a hybrid system with message streaming and notifications

results matching ""

    No results matching ""