Skip to content

Commit 5f36ce8

Browse files
committed
Add a session version of code + using env (OMMStream-MRN-LD-Env.py)
Update Docker to run session version of code instead (OMMStream-MRN-LD-Env.py)
1 parent 3f0cd71 commit 5f36ce8

9 files changed

Lines changed: 235 additions & 7 deletions

File tree

.devcontainer/.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
RDP_MACHINEID=MACHINE_ID
2+
RDP_PASSWORD=PASSWORD
3+
APP_KEY=APP_KEY

.devcontainer/devcontainer.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "MRN Data Library version 2",
3+
"image": "python:3.11-slim-bookworm",
4+
"customizations": {
5+
"vscode": {
6+
"extensions": ["ms-python.python","ms-python.debugpy"]
7+
}
8+
},
9+
"runArgs": ["--env-file=.devcontainer/.env"],
10+
"postCreateCommand": "pip install --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org --no-cache-dir --user -r requirements.txt",
11+
"shutdownAction":"stopContainer"
12+
}

.dockerignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ note.txt
44
images/
55
ldlib_mrn/
66
mrn_notebook/
7-
src/OMMStream-MRN-LD.ipynb
7+
src/OMMStream-MRN-LD.ipynb
8+
.devcontainer/

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ __pycache__/
44
ldlib_mrn/
55
note.txt
66
src/lseg-data.devrel.config.json
7-
mrn_notebook/
7+
mrn_notebook/
8+
src/.env
9+
.devcontainer/.env

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ ENV PATH=/root/.local:$PATH \
2424

2525
# copy only the dependencies installation from the 1st stage image
2626
COPY --from=builder /root/.local /root/.local
27-
COPY src/* .
27+
COPY src/OMMStream-MRN-LD-Env.py .
2828

2929
#Run Python
30-
ENTRYPOINT ["python", "OMMStream-MRN-LD.py"]
30+
ENTRYPOINT ["python", "OMMStream-MRN-LD-Env.py"]

README.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Machine Readable News Example with LSEG Data Library for Python
22

3-
- Last update: April 2025
3+
- Last update: June 2025
44
- Compiler: Python
55
- Prerequisite: The Real-Time Distribution System or The Real-Time -- Optimized credentials (V1) with MRN service
66

@@ -196,7 +196,13 @@ If you are using the [Anaconda](https://anaconda.org/anaconda/conda)/[Miniconda]
196196

197197
## <a id="how_to_run_docker"></a>Bonus: How to run this example with Docker
198198

199-
1. Setup the ```src/lseg_data.config.json``` file based on your preference like the instructions above
199+
1. Setup a ```src/.env``` file based on your RDP credential with the following content
200+
201+
```ini
202+
RDP_MACHINEID=MACHINE_ID
203+
RDP_PASSWORD=PASSWORD
204+
APP_KEY=APP_KEY
205+
```
200206
2. Go to the project folder in the console
201207
3. Run the following command in a console to build an image from a Dockerfile.
202208

@@ -207,7 +213,7 @@ If you are using the [Anaconda](https://anaconda.org/anaconda/conda)/[Miniconda]
207213
4. Once the build is a success, you can create and run the container with the following command
208214

209215
```bash
210-
$> docker run --name mrn_python -it mrn_python mrn_python
216+
$> docker run -it --env-file .\src\.env --name mrn_python mrn_python
211217
```
212218

213219
5. Press Ctrl+C buttons to stop the application

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pandas==2.2.3
1414
pyee==12.0.0
1515
pyhumps==3.8.0
1616
python-dateutil==2.9.0.post0
17+
python-dotenv==1.1.1
1718
pytz==2025.2
1819
requests==2.32.3
1920
scipy==1.15.2

src/.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
RDP_MACHINEID=MACHINE_ID
2+
RDP_PASSWORD=PASSWORD
3+
APP_KEY=APP_KEY

src/OMMStream-MRN-LD-Env.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# |-----------------------------------------------------------------------------
2+
# | This source code is provided under the Apache 2.0 license --
3+
# | and is provided AS IS with no warranty or guarantee of fit for purpose. --
4+
# | See the project's LICENSE.md for details. --
5+
# | Copyright LSEG 2025. All rights reserved. --
6+
# |-----------------------------------------------------------------------------
7+
8+
9+
#!/usr/bin/env python
10+
import os
11+
import sys
12+
import datetime
13+
import time
14+
import json
15+
import base64
16+
import zlib
17+
import binascii
18+
import lseg.data as ld
19+
from lseg.data.delivery import omm_stream
20+
from lseg.data import session
21+
from dotenv import load_dotenv
22+
23+
# list to contain the news envelopes
24+
_news_envelopes = []
25+
RIC_CODE = 'MRN_STORY'
26+
DOMAIN = 'NewsTextAnalytics'
27+
SERVICE = 'ELEKTRON_DD'
28+
29+
# Config the encoding for the console
30+
sys.stdin.reconfigure(encoding='utf-8')
31+
sys.stdout.reconfigure(encoding='utf-8')
32+
33+
username = ''
34+
password = ''
35+
app_key = ''
36+
37+
38+
def display_event(eventType, event):
39+
"""Retrieve data: Callback function to display data or status events. """
40+
current_time = datetime.datetime.now().time()
41+
print('----------------------------------------------------------')
42+
print(f'>>> {eventType} event received at {current_time}')
43+
print(json.dumps(event, indent=2))
44+
45+
return
46+
47+
48+
def process_mrn_update(event):
49+
"""Process MRN Update messages."""
50+
message_json = event
51+
fields_data = message_json['Fields']
52+
53+
# declare variables
54+
tot_size = 0
55+
guid = None
56+
57+
try:
58+
# Get data for all required fields
59+
fragment = base64.b64decode(fields_data['FRAGMENT'])
60+
frag_num = int(fields_data['FRAG_NUM'])
61+
guid = fields_data['GUID']
62+
mrn_src = fields_data['MRN_SRC']
63+
64+
# print("GUID = %s" % guid)
65+
# print("FRAG_NUM = %d" % frag_num)
66+
# print("MRN_SRC = %s" % mrn_src)
67+
68+
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
69+
guid_index = next((index for (index, d) in enumerate(
70+
_news_envelopes) if d['GUID'] == guid), None)
71+
envelop = _news_envelopes[guid_index]
72+
if envelop and envelop['data']['MRN_SRC'] == mrn_src and frag_num == envelop['data']['FRAG_NUM'] + 1:
73+
print(f'process multiple fragments for guid {envelop["GUID"]}')
74+
75+
# print(f'fragment before merge = {len(envelop["data"]["FRAGMENT"])}')
76+
# Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables
77+
fragment = envelop['data']['FRAGMENT'] = envelop['data']['FRAGMENT'] + fragment
78+
envelop['data']['FRAG_NUM'] = frag_num
79+
tot_size = envelop['data']['tot_size']
80+
print(f'TOT_SIZE = {tot_size}')
81+
print(f'Current FRAGMENT length = {len(fragment)}')
82+
83+
# The multiple fragments news are not completed, waiting.
84+
if tot_size != len(fragment):
85+
return None
86+
# The multiple fragments news are completed, delete associate GUID envelop
87+
elif tot_size == len(fragment):
88+
del _news_envelopes[guid_index]
89+
else:
90+
print(
91+
f'Error: Cannot find fragment for GUID {guid} with matching FRAG_NUM or MRN_SRC {mrn_src}')
92+
return None
93+
else: # FRAG_NUM = 1 The first fragment
94+
tot_size = int(fields_data['TOT_SIZE'])
95+
print(f'FRAGMENT length = {len(fragment)}')
96+
# The fragment news is not completed, waiting and add this news data to envelop object.
97+
if tot_size != len(fragment):
98+
print(f'Add new fragments to news envelop for guid {guid}')
99+
_news_envelopes.append({ # the envelop object is a Python dictionary with GUID as a key and other fields are data
100+
'GUID': guid,
101+
'data': {
102+
'FRAGMENT': fragment,
103+
'MRN_SRC': mrn_src,
104+
'FRAG_NUM': frag_num,
105+
"tot_size": tot_size
106+
}
107+
})
108+
return None
109+
110+
# News Fragment(s) completed, decompress and print data as JSON to console
111+
if tot_size == len(fragment):
112+
print(f'decompress News FRAGMENT(s) for GUID {guid}')
113+
decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
114+
print(f'News = {json.loads(decompressed_data)}')
115+
116+
except KeyError as keyerror:
117+
print('KeyError exception: ', keyerror)
118+
except IndexError as indexerror:
119+
print('IndexError exception: ', indexerror)
120+
except binascii.Error as b64error:
121+
print('base64 decoding exception:', b64error)
122+
except zlib.error as error:
123+
print('zlib decompressing exception: ', error)
124+
# Some console environments like Windows may encounter this unicode display as a limitation of OS
125+
except UnicodeEncodeError as encodeerror:
126+
print(
127+
f'UnicodeEncodeError exception. Cannot decode unicode character for {guid} in this environment: ', encodeerror)
128+
except Exception as specific_error:
129+
print(f'exception: str{specific_error}', sys.exc_info()[0])
130+
131+
132+
if __name__ == '__main__':
133+
try:
134+
load_dotenv()
135+
username = os.getenv('RDP_MACHINEID')
136+
password = os.getenv('RDP_PASSWORD')
137+
app_key = os.getenv('APP_KEY')
138+
139+
if not username or not password or not app_key:
140+
print("RDP_MACHINEID, RDP_PASSWORD, and APP_KEY are required environment variables")
141+
sys.exit(2)
142+
143+
# Open the data session
144+
session = ld.session.platform.Definition(
145+
app_key=app_key,
146+
grant=ld.session.platform.GrantPassword(
147+
username=username,
148+
password=password
149+
),
150+
signon_control=True
151+
).get_session()
152+
ld.session.set_default(session)
153+
session.open()
154+
# ld.open_session()
155+
# ld.open_session(config_name='./lseg-data.devrel.config.json')
156+
except Exception as ex:
157+
print(f'Error in open_session: {str(ex)}')
158+
sys.exit(1)
159+
160+
if str(session.open_state) == 'OpenState.Opened':
161+
print('Session opens, starting request data')
162+
else:
163+
print('Session open fail, exit an application')
164+
sys.exit(1)
165+
166+
# Create an OMM stream and register event callbacks
167+
stream = omm_stream.Definition(
168+
name=RIC_CODE,
169+
domain=DOMAIN,
170+
service=SERVICE).get_stream()
171+
172+
# Define the event callbacks
173+
# Refresh - the first full image we get back from the server
174+
stream.on_refresh(
175+
lambda event, item_stream: display_event('Refresh', event))
176+
177+
# Update - as and when field values change, we receive updates from the server and process the MRN data
178+
stream.on_update(lambda event, item_stream: process_mrn_update(event))
179+
180+
# Status - if data goes stale or item closes, we get a status message
181+
stream.on_status(lambda event, item_stream: display_event('Status', event))
182+
183+
# Other errors
184+
stream.on_error(lambda event, item_stream: display_event('Error', event))
185+
186+
# Open the stream
187+
188+
# Send request to server and open stream
189+
stream.open()
190+
# We should receive the initial Refresh for the current field values
191+
# followed by updates for the fields as and when they occur
192+
193+
try:
194+
while True:
195+
time.sleep(1)
196+
except KeyboardInterrupt:
197+
stream.close()
198+
# Close the session
199+
session.close()
200+
ld.close_session()

0 commit comments

Comments
 (0)