Skip to content

Commit 5ad6153

Browse files
committed
Implement event handling for news data updates and manage multiple fragments
1 parent 111b405 commit 5ad6153

1 file changed

Lines changed: 133 additions & 4 deletions

File tree

src/app.py

Lines changed: 133 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,144 @@
77

88

99
#!/usr/bin/env python
10-
import os
1110
import lseg.data as ld
12-
from lseg.data import session
11+
from lseg.data.delivery import omm_stream
1312
import datetime
13+
import time
1414
import json
1515
import base64
1616
import zlib
17+
import binascii
1718

19+
# list to contain the news envelopes
20+
_news_envelopes = []
21+
RIC_CODE = 'MRN_STORY'
22+
DOMAIN = 'NewsTextAnalytics'
1823

19-
ld.open_session(config_name='./lseg-data.devrel.config.json')
24+
# Retrieve data
25+
# Callback function to display data or status events
26+
def display_event(eventType, event):
27+
currentTime = datetime.datetime.now().time()
28+
print("----------------------------------------------------------")
29+
print(">>> {} event received at {}".format(eventType, currentTime))
30+
print(json.dumps(event, indent=2))
31+
if eventType == "Update":
32+
process_mrn_update(event)
33+
return
2034

21-
ld.close_session()
35+
def process_mrn_update(message_json):
36+
"""Function process Update Message for MRN domain data"""
37+
fields_data = message_json['Fields']
38+
39+
# declare variables
40+
tot_size = 0
41+
guid = None
42+
43+
try:
44+
# Get data for all required fields
45+
fragment = base64.b64decode(fields_data['FRAGMENT'])
46+
frag_num = int(fields_data['FRAG_NUM'])
47+
guid = fields_data['GUID']
48+
mrn_src = fields_data['MRN_SRC']
49+
50+
#print("GUID = %s" % guid)
51+
#print("FRAG_NUM = %d" % frag_num)
52+
#print("MRN_SRC = %s" % mrn_src)
53+
54+
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
55+
guid_index = next((index for (index, d) in enumerate(_news_envelopes) if d['GUID'] == guid), None)
56+
envelop = _news_envelopes[guid_index]
57+
if envelop and envelop['data']['MRN_SRC'] == mrn_src and frag_num == envelop['data']['FRAG_NUM'] + 1:
58+
print(f'process multiple fragments for guid {envelop["GUID"]}')
59+
60+
#print(f'fragment before merge = {len(envelop["data"]["FRAGMENT"])}')
61+
# Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables
62+
fragment = envelop['data']['FRAGMENT'] = envelop['data']['FRAGMENT'] + fragment
63+
envelop['data']['FRAG_NUM'] = frag_num
64+
tot_size = envelop['data']['tot_size']
65+
print(f'TOT_SIZE = {tot_size}')
66+
print(f'Current FRAGMENT length = {len(fragment)}')
67+
68+
# The multiple fragments news are not completed, waiting.
69+
if tot_size != len(fragment):
70+
return None
71+
# The multiple fragments news are completed, delete associate GUID envelop
72+
elif tot_size == len(fragment):
73+
del _news_envelopes[guid_index]
74+
else:
75+
print(f'Error: Cannot find fragment for GUID {guid} with matching FRAG_NUM or MRN_SRC {mrn_src}')
76+
return None
77+
else: # FRAG_NUM = 1 The first fragment
78+
tot_size = int(fields_data['TOT_SIZE'])
79+
print(f'FRAGMENT length = {len(fragment)}')
80+
# The fragment news is not completed, waiting and add this news data to envelop object.
81+
if tot_size != len(fragment):
82+
print(f'Add new fragments to news envelop for guid {guid}')
83+
_news_envelopes.append({ # the envelop object is a Python dictionary with GUID as a key and other fields are data
84+
'GUID': guid,
85+
'data': {
86+
'FRAGMENT': fragment,
87+
'MRN_SRC': mrn_src,
88+
'FRAG_NUM': frag_num,
89+
"tot_size": tot_size
90+
}
91+
})
92+
return None
93+
94+
# News Fragment(s) completed, decompress and print data as JSON to console
95+
if tot_size == len(fragment):
96+
print(f'decompress News FRAGMENT(s) for GUID {guid}')
97+
decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
98+
print(f'News = {json.loads(decompressed_data)}')
99+
100+
except KeyError as keyerror:
101+
print('KeyError exception: ', keyerror)
102+
except IndexError as indexerror:
103+
print('IndexError exception: ', indexerror)
104+
except binascii.Error as b64error:
105+
print('base64 decoding exception:', b64error)
106+
except zlib.error as error:
107+
print('zlib decompressing exception: ', error)
108+
# Some console environments like Windows may encounter this unicode display as a limitation of OS
109+
except UnicodeEncodeError as encodeerror:
110+
print(f'UnicodeEncodeError exception. Cannot decode unicode character for {guid} in this environment: ', encodeerror)
111+
except Exception as e:
112+
print('exception: ', sys.exc_info()[0])
113+
114+
if __name__ == '__main__':
115+
# Open the data session
116+
ld.open_session()
117+
#ld.open_session(config_name='./lseg-data.devrel.config.json')
118+
119+
# Create an OMM stream and register event callbacks
120+
stream = omm_stream.Definition(
121+
name=RIC_CODE,
122+
domain= DOMAIN).get_stream()
123+
124+
# Define the event callbacks
125+
# Refresh - the first full image we get back from the server
126+
stream.on_refresh(lambda event, item_stream : display_event("Refresh", event))
127+
128+
# Update - as and when field values change, we receive updates from the server
129+
stream.on_update(lambda event, item_stream : display_event("Update", event))
130+
131+
# Status - if data goes stale or item closes, we get a status message
132+
stream.on_status(lambda event, item_stream : display_event("Status", event))
133+
134+
# Other errors
135+
stream.on_error(lambda event, item_stream : display_event("Error", event))
136+
137+
# Open the stream
138+
139+
# Send request to server and open stream
140+
stream.open()
141+
# We should receive the initial Refresh for the current field values
142+
# followed by updates for the fields as and when they occur
143+
144+
try:
145+
while True:
146+
time.sleep(1)
147+
except KeyboardInterrupt:
148+
stream.close()
149+
# Close the session
150+
ld.close_session()

0 commit comments

Comments
 (0)