-
Notifications
You must be signed in to change notification settings - Fork 174
/
association.py
4364 lines (3701 loc) · 168 KB
/
association.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Defines the Association class which handles associating with peers."""
from io import BytesIO
import logging
import os
from pathlib import Path
import threading
import time
from typing import (
Callable,
Any,
Iterator,
TYPE_CHECKING,
cast,
)
import warnings
from pydicom import dcmread
from pydicom.dataset import Dataset
from pydicom.tag import BaseTag
from pydicom.uid import UID, ImplicitVRLittleEndian, ExplicitVRBigEndian
# pylint: disable=no-name-in-module
from pynetdicom.acse import ACSE
from pynetdicom import _config, evt
from pynetdicom.dimse import DIMSEServiceProvider
from pynetdicom.dimse_primitives import (
C_ECHO,
C_MOVE,
C_STORE,
C_GET,
C_FIND,
C_CANCEL,
N_EVENT_REPORT,
N_GET,
N_SET,
N_CREATE,
N_ACTION,
N_DELETE,
DimseServiceType,
)
from pynetdicom.dsutils import decode, encode, pretty_dataset, split_dataset
from pynetdicom.dul import DULServiceProvider
from pynetdicom._globals import (
MODE_REQUESTOR,
MODE_ACCEPTOR,
DEFAULT_MAX_LENGTH,
STATUS_WARNING,
STATUS_SUCCESS,
STATUS_CANCEL,
STATUS_PENDING,
STATUS_FAILURE,
)
from pynetdicom._handlers import (
standard_dimse_recv_handler,
standard_dimse_sent_handler,
standard_pdu_recv_handler,
standard_pdu_sent_handler,
)
from pynetdicom.pdu_primitives import (
UserIdentityNegotiation,
MaximumLengthNotification,
ImplementationClassUIDNotification,
ImplementationVersionNameNotification,
AsynchronousOperationsWindowNegotiation,
SOPClassExtendedNegotiation,
SOPClassCommonExtendedNegotiation,
SCP_SCU_RoleSelectionNegotiation,
A_ASSOCIATE,
_UI,
_UITypes,
)
from pynetdicom.presentation import PresentationContext
from pynetdicom.sop_class import ( # type: ignore
RepositoryQuery,
uid_to_service_class,
UnifiedProcedureStepPull,
UnifiedProcedureStepPush,
UnifiedProcedureStepEvent,
UnifiedProcedureStepQuery,
UnifiedProcedureStepWatch,
Verification,
)
from pynetdicom.status import code_to_category, STORAGE_SERVICE_CLASS_STATUS
from pynetdicom.utils import make_target, set_timer_resolution, set_ae, decode_bytes
if TYPE_CHECKING: # pragma: no cover
from pynetdicom.ae import ApplicationEntity
from pynetdicom.transport import AssociationServer, AssociationSocket
# pylint: enable=no-name-in-module
LOGGER = logging.getLogger(__name__)
HandlerType = dict[
evt.EventType,
(list[tuple[Callable, None | list[Any]]] | tuple[Callable, None | list[Any]]),
]
class Association(threading.Thread):
"""Manage an Association with a peer AE.
Attributes
----------
acceptor : association.ServiceUser
Representation of the association's *acceptor* AE.
acse : acse.ACSE
The Association Control Service Element provider.
dimse : dimse.DIMSEServiceProvider
The DICOM Message Service Element provider.
dul : dul.DULServiceProvider
The DICOM Upper Layer service provider.
is_aborted : bool
``True`` if the association has been aborted, ``False`` otherwise.
is_established : bool
``True`` if the association has been established, ``False`` otherwise.
is_rejected : bool
``True`` if the association was rejected, ``False`` otherwise.
is_released : bool
``True`` if the association has been released, ``False`` otherwise.
network_timeout_response : str
If ``"A-RELEASE"`` then initiate a normal association release on expiry of the
network timeout, otherwise issue an A-ABORT (default).
requestor : association.ServiceUser
Representation of the association's *requestor* AE.
"""
def __init__(self, ae: "ApplicationEntity", mode: str) -> None:
"""Create a new :class:`Association` instance.
The association starts in State 1 (idle). Association negotiation
won't begin until an :class:`~pynetdicom.transport.AssociationSocket`
is assigned using :meth:`set_socket` and
:meth:`~threading.Thread.start` is called.
Parameters
----------
ae : ae.ApplicationEntity
The local AE.
mode : str
Must be ``'requestor'`` or ``'acceptor'``.
"""
self._ae: "ApplicationEntity" = ae
self.mode: str = mode
# If acceptor this is the parent AssociationServer, used to identify
# the thread when updating bound event-handlers
self._server: None | "AssociationServer" = None
# Represents the association requestor and acceptor users
self.requestor: ServiceUser = ServiceUser(self, MODE_REQUESTOR)
self.acceptor: ServiceUser = ServiceUser(self, MODE_ACCEPTOR)
# Status attributes
self.is_established: bool = False
self.is_rejected: bool = False
self.is_aborted: bool = False
self.is_released: bool = False
# Track whether we've sent an abort or not for the abort() method
self._sent_abort: bool = False
# Accepted and rejected presentation contexts
self._accepted_cx: dict[int, PresentationContext] = {}
self._rejected_cx: list[PresentationContext] = []
# Service providers
self.acse: ACSE = ACSE(self)
self.dul: DULServiceProvider = DULServiceProvider(self)
self.dimse: DIMSEServiceProvider = DIMSEServiceProvider(self)
# Timeouts (in seconds), needs to be set after DUL init
self.acse_timeout: float | None = self.ae.acse_timeout
self.connection_timeout: float | None = self.ae.connection_timeout
self.dimse_timeout: float | None = self.ae.dimse_timeout
self.network_timeout: float | None = self.ae.network_timeout
# Allow customising the response to a network timeout
self.network_timeout_response = "A-ABORT"
# Event handlers
self._handlers: HandlerType = {}
self._bind_defaults()
# Kills the thread loop in run()
self._kill: bool = False
# Flag for whether or not the DUL thread has been started
self._started_dul: bool = False
# Used to pause the association reactor until the DUL is ready
self._dul_ready: threading.Event = threading.Event()
# Used to pause the association reactor while a service is being used
self._reactor_checkpoint: threading.Event = threading.Event()
self._reactor_checkpoint.set()
# Used to ensure the reactor is paused before DIMSE messaging
self._is_paused: bool = False
# Windows timer resolution
self._timer_resolution: float | None = _config.WINDOWS_TIMER_RESOLUTION
# Thread setup
threading.Thread.__init__(self, target=make_target(self.run_reactor))
self.daemon: bool = True
def abort(self) -> None:
"""Abort the :class:`Association` by sending an A-ABORT to the remote
AE.
"""
# Only allow a single abort message to be sent
if self._sent_abort:
return
if not self.is_released:
# Set before restarting the reactor to prevent race condition
self._sent_abort = True
# Ensure the reactor is running so it can be exited
self._reactor_checkpoint.set()
LOGGER.info("Aborting Association")
self.acse.send_abort(0x00)
# Event handler - association aborted
evt.trigger(self, evt.EVT_ABORTED, {})
self.kill()
# Add short delay to ensure everything shuts down
time.sleep(0.1)
@property
def accepted_contexts(self) -> list[PresentationContext]:
"""Return a :class:`list` of accepted
:class:`~pynetdicom.presentation.PresentationContext` items."""
# Accepted contexts are stored internally as {context ID : context}
return sorted(self._accepted_cx.values(), key=lambda x: cast(int, x.context_id))
@property
def acse_timeout(self) -> float | None:
"""The ACSE timeout (in seconds)."""
return self._acse_timeout
@acse_timeout.setter
def acse_timeout(self, value: float | None) -> None:
"""Set the ACSE timeout using numeric or ``None``."""
with self.lock:
self.dul.artim_timer.timeout = value
self._acse_timeout = value
@property
def ae(self) -> "ApplicationEntity":
"""Return the parent :class:`~pynetdicom.ae.ApplicationEntity`."""
return self._ae
def bind(
self, event: evt.EventType, handler: Callable, args: None | list[Any] = None
) -> None:
"""Bind a callable `handler` to an `event`.
.. versionadded:: 1.3
.. versionchanged:: 1.5
Added `args` keyword parameter.
Parameters
----------
event : collections.namedtuple
The event to bind the function to.
handler : callable
The function that will be called if the event occurs.
args : list, optional
Optional extra arguments to be passed to the handler (default:
no extra arguments passed to the handler).
"""
# Make sure no access to `_handlers` while its being changed
with self.lock:
evt._add_handler(event, self._handlers, (handler, args))
def _bind_defaults(self) -> None:
"""Bind the default event handlers."""
# Intervention event handlers
for event in evt._INTERVENTION_EVENTS:
handler = evt.get_default_handler(event)
self.bind(event, handler)
# Notification event handlers
if _config.LOG_HANDLER_LEVEL == "standard":
self.bind(evt.EVT_DIMSE_RECV, standard_dimse_recv_handler)
self.bind(evt.EVT_DIMSE_SENT, standard_dimse_sent_handler)
self.bind(evt.EVT_PDU_RECV, standard_pdu_recv_handler)
self.bind(evt.EVT_PDU_SENT, standard_pdu_sent_handler)
def _check_received_status(self, rsp: DimseServiceType) -> Dataset:
"""Return a :class:`~pydicom.dataset.Dataset` containing status
related elements.
Parameters
----------
rsp : dimse_primitives.DIMSEMessage
The DIMSE Message primitive received from the peer in response
to a service request.
Returns
-------
pydicom.dataset.Dataset
If no response or an invalid response was received from the peer
then an empty :class:`~pydicom.dataset.Dataset`, if a valid
response was received from the peer then (at a minimum) a
:class:`~pydicom.dataset.Dataset` containing an
(0000,0900) *Status* element, and any included optional status
related elements.
"""
msg_type = rsp.__class__.__name__
msg_type = msg_type.replace("_", "-")
status = Dataset()
if rsp.is_valid_response:
status.Status = rsp.Status
for keyword in rsp.STATUS_OPTIONAL_KEYWORDS:
if getattr(rsp, keyword, None) is not None:
setattr(status, keyword, getattr(rsp, keyword))
else:
LOGGER.error(f"Received an invalid {msg_type} response from the peer")
self.abort()
return status
@property
def dimse_timeout(self) -> int | float | None:
"""The DIMSE timeout (in seconds)."""
return self._dimse_timeout
@dimse_timeout.setter
def dimse_timeout(self, value: int | float | None) -> None:
"""Set the DIMSE timeout using numeric or ``None``."""
with self.lock:
self._dimse_timeout = value
def get_events(self) -> list[evt.EventType]:
"""Return a :class:`list` of currently bound events.
.. versionadded:: 1.3
"""
return sorted(self._handlers.keys(), key=lambda x: x.name)
def get_handlers(self, event: evt.EventType) -> evt.HandlerArgType:
"""Return the handlers bound to a specific `event`.
.. versionadded:: 1.3
.. versionchanged:: 1.5
Returns a 2-tuple of (callable, args) or list of 2-tuple.
Parameters
----------
event : namedtuple
The event bound to the handlers.
Returns
-------
2-tuple of (callable, args), list of 2-tuple
If the event is a notification event then returns a list of
2-tuples containing the callable functions bound to `event` and
the arguments passed to the callable as ``(callable, args)``. If
the event is an intervention event then returns either a 2-tuple of
(callable, args) if a handler is bound to the event or
``(None, None)`` if no handler has been bound.
"""
if event not in self._handlers:
return []
return self._handlers[event]
def _get_valid_context(
self,
ab_syntax: str | UID,
tr_syntax: str | UID,
role: str | None = None,
context_id: int | None = None,
allow_conversion: bool = True,
) -> PresentationContext:
"""Return a valid presentation context matching the parameters.
.. versionchanged:: 1.5
Changed to prefer an exact matching context over a convertible one
and to reject contexts without matching endianness
.. versionchanged:: 2.0
Added `allow_conversion` keyword parameter.
Parameters
----------
ab_syntax : str or pydicom.uid.UID
The abstract syntax to match.
tr_syntax : str or pydicom.uid.UID
The transfer syntax to match, if an empty string is used then
the transfer syntax will not be used for matching. If the value
corresponds to an uncompressed syntax then matches will be made
with any uncompressed transfer syntax but an exact match will
be preferred.
role : str, optional
One of ``'scu'`` or ``'scp'``, the required role of the context.
If not used then the accepted role will be ignored.
context_id : int, optional
If used then the ID of the presentation context to use. It
will be checked against the available parameter values. If the ID
isn't found then will check against all accepted contexts.
allow_conversion : bool, optional
If ``True`` (default), then if there's no exact matching accepted
presentation context then use a convertible one instead. If
``False`` then an exact matching context is required.
Returns
-------
presentation.PresentationContext
An accepted presentation context.
"""
ab_syntax = UID(ab_syntax)
tr_syntax = UID(tr_syntax)
try:
possible_contexts = [self._accepted_cx[context_id]] # type: ignore
except KeyError:
possible_contexts = self.accepted_contexts
# Filter by abstract syntax
possible_contexts = [
cx for cx in possible_contexts if ab_syntax == cx.abstract_syntax
]
# For UPS we can also match UPS Push to Pull/Watch/Event/Query
if ab_syntax == UnifiedProcedureStepPush and not possible_contexts:
LOGGER.info(
"No exact matching context found for 'Unified Procedure Step "
"- Push SOP Class', checking accepted contexts for other UPS "
"SOP classes"
)
ups = [
UnifiedProcedureStepPull,
UnifiedProcedureStepWatch,
UnifiedProcedureStepEvent,
UnifiedProcedureStepQuery,
]
possible_contexts.extend(
[cx for cx in self._accepted_cx.values() if cx.abstract_syntax in ups]
)
# Filter by role
if role == "scu":
possible_contexts = [cx for cx in possible_contexts if cx.as_scu is True]
if role == "scp":
possible_contexts = [cx for cx in possible_contexts if cx.as_scp is True]
matches = []
for cx in possible_contexts:
cx_syntax = cx.transfer_syntax[0]
if tr_syntax:
if tr_syntax == cx_syntax:
# Exact match to transfer syntax
return cx
# Compressed transfer syntaxes are not convertible
# This excludes deflated transfer syntaxes
if tr_syntax.is_compressed or cx_syntax.is_compressed:
continue
# Filter out contexts where the endianness doesn't match
if tr_syntax.is_little_endian != cx_syntax.is_little_endian:
continue
# Match to convertible transfer syntaxes
# Allowable matches:
# explicit VR <-> implicit VR
# deflated <-> inflated
matches.append(cx)
if allow_conversion and matches:
return matches[0]
role = role or "scu"
msg = (
f"No presentation context for '{ab_syntax.name}' has been "
f"accepted by the peer"
)
if tr_syntax:
msg += f" with '{tr_syntax.name}' transfer syntax"
msg += f" for the {role.upper()} role"
LOGGER.error(msg)
raise ValueError(msg)
def _handle_no_response(self) -> None:
"""Common reaction when DIMSE timeout hit or no response message."""
# Avoids writing the same unit test for each send_ method
if self.acse.is_aborted("a-abort"):
# Let the main reactor loop handle A-ABORT logging
pass
elif self.acse.is_aborted("a-p-abort"):
# Evt17 occurred while in Sta6
LOGGER.error("Connection closed while waiting for DIMSE message")
elif self.is_established:
LOGGER.error("DIMSE timeout reached while waiting for message response")
self.abort()
@property
def is_acceptor(self) -> bool:
"""Return ``True`` if the local AE is the association *acceptor*."""
return self.mode == MODE_ACCEPTOR
@property
def is_requestor(self) -> bool:
"""Return ``True`` if the local AE is the association *requestor*."""
return self.mode == MODE_REQUESTOR
def kill(self) -> None:
"""Kill the :class:`Association` thread."""
# Ensure the reactor is running so it can be exited
self._reactor_checkpoint.set()
self._kill = True
self.is_established = False
self._is_paused = True
while self.dul.is_alive() and not self.dul.stop_dul():
time.sleep(0.01)
@property
def local(self) -> dict[str, Any]:
"""Return a :class:`dict` with information about the local AE."""
if self.is_acceptor:
return self.acceptor.info
return self.requestor.info
@property
def lock(self) -> threading.Lock:
"""Return the AE's :class:`threading.Lock`."""
return self.ae._lock
@property
def mode(self) -> str:
"""The Association's `mode` as a :class:`str`.
Parameters
----------
mode : str
The mode of the Association, must be either ``'requestor'`` or
``'acceptor'``. If ``'requestor'`` then its assumed that the local
AE requests an association with peers and (by default) acts as the
SCU. If ``'acceptor'`` then its assumed that the local AE is
listening for association requests and (by default) acts as the
SCP.
"""
return self._mode
@mode.setter
def mode(self, mode: str) -> None:
"""Set the Association's mode."""
mode = mode.lower()
if mode not in [MODE_REQUESTOR, MODE_ACCEPTOR]:
raise ValueError(
"Invalid association `mode` value, must be either 'requestor' "
"or 'acceptor'"
)
# pylint: disable=attribute-defined-outside-init
self._mode = mode
@property
def network_timeout(self) -> int | float | None:
"""The network timeout (in seconds)."""
return self._network_timeout
@network_timeout.setter
def network_timeout(self, value: int | float | None) -> None:
"""Set the network timeout using numeric or ``None``."""
with self.lock:
self.dul._idle_timer.timeout = value
self._network_timeout = value
@property
def rejected_contexts(self) -> list[PresentationContext]:
"""Return a :class:`list` of rejected
:class:`~pynetdicom.presentation.PresentationContext`.
"""
return self._rejected_cx
def release(self) -> None:
"""Initiate association release by send an A-RELEASE request."""
if self.is_established:
# Ensure the reactor is paused so it doesn't
# steal incoming ACSE messages
self._reactor_checkpoint.clear()
while not self._is_paused:
time.sleep(0.0001)
LOGGER.info("Releasing Association")
self.acse.negotiate_release()
# Restart reactor
self._reactor_checkpoint.set()
@property
def remote(self) -> dict[str, Any]:
"""Return a :class:`dict` with information about the peer AE."""
if self.is_acceptor:
return self.requestor.info
return self.acceptor.info
def request(self) -> None:
"""Request an association with a peer.
A request can only be made once the :class:`Association` instance has
been configured for requestor mode and been assigned an
:class:`~pynetdicom.transport.AssociationSocket`.
"""
# Start the DUL thread if not already started
self.dul.start()
self._started_dul = True
# Wait until the DUL is up and running
self._dul_ready.wait()
# Start association negotiation
LOGGER.info("Requesting Association")
self.acse.negotiate_association()
def run_reactor(self) -> None:
"""The main :class:`Association` reactor."""
# Start the DUL thread if not already started
if not self._started_dul:
self.dul.start()
self._started_dul = True
# Wait until the DUL is up and running
self._dul_ready.wait()
if self.is_acceptor:
primitive = self.dul.receive_pdu(wait=True, timeout=self.acse_timeout)
# Timed out waiting for A-ASSOCIATE request
if primitive is None:
self.kill()
# Ensure the connection is shutdown properly
sock = cast("AssociationSocket", self.dul.socket)
if self._server and sock.socket:
self._server.shutdown_request(sock.socket)
return
self.requestor.primitive = cast(A_ASSOCIATE, primitive)
evt.trigger(self, evt.EVT_REQUESTED, {})
# User used EVT_REQUESTED to send an A-ABORT or A-ASSOCIATE-RJ
if not self.is_aborted and not self.is_rejected:
self.acse.negotiate_association()
if self.is_established:
with set_timer_resolution(self._timer_resolution):
self._run_reactor()
# Ensure the connection is shutdown properly
sock = cast("AssociationSocket", self.dul.socket)
if self._server and sock.socket:
self._server.shutdown_request(sock.socket)
else:
# Association requestor
# Allow non-blocking negotiation
if (
not self.is_established
and not self.is_aborted
and not self.is_released
and not self.is_rejected
):
self.acse.negotiate_association()
if self.is_established:
with set_timer_resolution(self._timer_resolution):
self._run_reactor()
def _run_reactor(self) -> None:
"""Run the ``Association`` acceptor reactor loop.
Main acceptor run loop
1. Checks for incoming DIMSE messages
If DIMSE message then run corresponding service class' SCP
method
2. Checks for peer A-RELEASE request primitive
If present then kill thread
3. Checks for peer A-ABORT request primitive
If present then kill thread
4. Checks DUL provider still running
If not then kill thread
5. Checks DUL idle timeout
If timed out then kill thread
"""
self._is_paused = False
while not self._kill:
time.sleep(0.001)
# A race condition may occur if the Acceptor uses the send_*()
# methods as the received DIMSE message may be taken off the
# queue before the send_*() method gets to it, so we allow
# the reactor to be paused
# We also need to be careful that the reactor actually stops
# before attempting DIMSE or ACSE messaging
# Will block until `_reactor_checkpoint` is set()
self._is_paused = True
self._reactor_checkpoint.wait()
self._is_paused = False
# Check with the DIMSE provider to see if a completely decoded
# message is available
context_id, msg = self.dimse.get_msg(block=False)
if msg:
self._serve_request(msg, cast(int, context_id))
# Check for release request
if self.acse.is_release_requested():
# Send A-RELEASE response
self.acse.send_release(is_response=True)
LOGGER.info("Association Released")
self.is_released = True
self.is_established = False
evt.trigger(self, evt.EVT_RELEASED, {})
self.kill()
return
# Check for abort
if self.acse.is_aborted():
log_msg = "Association Aborted"
if self.acse.is_aborted("a-p-abort"):
log_msg += " (A-P-ABORT)"
LOGGER.info(log_msg)
# Ensure that EVT_ASCE_RECV fires for subscribers
self.dul.receive_pdu(wait=False)
self.is_aborted = True
self.is_established = False
evt.trigger(self, evt.EVT_ABORTED, {})
self.kill()
return
# Check if the DULServiceProvider thread is still running
# DUL.is_alive() is inherited from threading.thread
if not self.dul.is_alive():
self.kill()
return
# Check if network_timeout has expired
if self.dul.idle_timer_expired():
LOGGER.error("Network timeout reached")
if self.network_timeout_response == "A-RELEASE":
self._is_paused = True
self._reactor_checkpoint.wait()
self.release()
self._is_paused = False
else:
self.abort()
self.kill()
return
def set_socket(self, socket: "AssociationSocket") -> None:
"""Set the `socket` to use for communicating with the peer.
Parameters
----------
socket : transport.AssociationSocket
The socket to use.
Raises
------
RuntimeError
If the :class:`Association` already has a socket set.
"""
if self.dul.socket is not None:
raise RuntimeError("The Association already has a socket set")
self.dul.socket = socket
def unbind(self, event: evt.EventType, handler: Callable) -> None:
"""Unbind a callable `handler` from an `event`.
.. versionadded:: 1.3
Parameters
----------
event : namedtuple
The event to unbind the function from.
handler : callable
The function that will no longer be called if the event occurs.
"""
# Make sure no access to `_handlers` while its being changed
with self.lock:
evt._remove_handler(event, self._handlers, handler)
# DIMSE-C services provided by the Association
def _c_store_scp(self, req: C_STORE) -> None:
"""A C-STORE SCP implementation.
Handles C-STORE requests from the peer over the same association as
the local AE sent a C-MOVE or C-GET request.
Must always send a C-STORE response back to the peer.
C-STORE Request
---------------
Parameters
~~~~~~~~~~
(M) Message ID
(M) Affected SOP Class UID
(M) Affected SOP Instance UID
(M) Priority
(U) Move Originator Application Entity Title
(U) Move Originator Message ID
(M) Data Set
Parameters
----------
req : dimse_primitives.C_STORE
The C-STORE request primitive received from the peer.
"""
# Build C-STORE response primitive
# (U) Message ID
# (M) Message ID Being Responded To
# (U) Affected SOP Class UID
# (U) Affected SOP Instance UID
# (M) Status
rsp = C_STORE()
rsp.MessageID = req.MessageID
rsp.MessageIDBeingRespondedTo = req.MessageID
rsp.AffectedSOPInstanceUID = req.AffectedSOPInstanceUID
rsp.AffectedSOPClassUID = req.AffectedSOPClassUID
try:
context = self._get_valid_context(
cast(UID, req.AffectedSOPClassUID),
"",
"scp",
context_id=req._context_id,
)
except ValueError:
# SOP Class not supported, no context ID?
rsp.Status = 0x0122
self.dimse.send_msg(rsp, 1)
return
# Attempt to handle the service request
try:
status = evt.trigger(
self, evt.EVT_C_STORE, {"request": req, "context": context.as_tuple}
)
except Exception as ex:
LOGGER.error("Exception in the handler bound to 'evt.EVT_C_STORE'")
LOGGER.exception(ex)
rsp.Status = 0xC211
self.dimse.send_msg(rsp, cast(int, context.context_id))
return
# Check the callback's returned status
if isinstance(status, Dataset):
if "Status" in status:
# For the elements in the status dataset, try and set
# the corresponding response primitive attribute
for elem in status:
if hasattr(rsp, elem.keyword):
setattr(rsp, elem.keyword, elem.value)
else:
LOGGER.warning(
"Status dataset returned from the EVT_C_STORE "
"handler contained an unsupported element "
f"'{elem.keyword}'"
)
else:
LOGGER.error(
"The EVT_C_STORE handler returned a Dataset without a "
"'Status' element"
)
rsp.Status = 0xC001
elif isinstance(status, int):
rsp.Status = status
else:
LOGGER.error("Invalid status returned by the EVT_C_STORE handler")
rsp.Status = 0xC002
if rsp.Status not in STORAGE_SERVICE_CLASS_STATUS:
LOGGER.warning(
"Unknown status value returned by the EVT_C_STORE handler: "
f"0x{rsp.Status:04X}"
)
# Send C-STORE confirmation back to peer
self.dimse.send_msg(rsp, cast(int, context.context_id))
def send_c_cancel(
self,
msg_id: int,
context_id: int | None = None,
query_model: str | UID | None = None,
) -> None:
"""Send a C-CANCEL request to the peer AE.
.. versionchanged:: 2.0
Added `query_model` and made `context_id` optional
Parameters
----------
msg_id : int
The *Message ID* of the C-GET/C-MOVE/C-FIND operation to be
cancelled. Must be between 0 and 65535, inclusive.
context_id : int, optional
The presentation context ID of the original C-GET/C-MOVE/C-FIND
service request. Required if `query_model` is not used.
query_model : str or pydicom.uid.UID, optional
The query model used with the original C-GET/C-MOVE/C-FIND service
request. Required if `context_id` is not used.
"""
# Can't send a C-CANCEL without an Association
if not self.is_established:
raise RuntimeError(
"The association with a peer SCP must be "
"established before sending a C-CANCEL request"
)
if isinstance(query_model, (str, UID)):
cx = self._get_valid_context(query_model, "", "scu")
context_id = cx.context_id
elif isinstance(context_id, int):
pass
else:
raise ValueError(
"'send_c_cancel' requires either the 'query_model' used for "
"the service request or the corresponding 'context_id'"
)
# Build C-CANCEL primitive
primitive = C_CANCEL()
primitive.MessageIDBeingRespondedTo = msg_id
LOGGER.info("Sending C-CANCEL request")
# Send C-CANCEL request
self.dimse.send_msg(primitive, cast(int, context_id))
def send_c_echo(self, msg_id: int = 1) -> Dataset:
"""Send a C-ECHO request to the peer AE.
Parameters
----------
msg_id : int, optional
The C-ECHO request's *Message ID*, must be between 0 and 65535,
inclusive, (default ``1``).
Returns
-------
status : pydicom.dataset.Dataset
If the peer timed out, aborted or sent an invalid response then
returns an empty :class:`~pydicom.dataset.Dataset`. If a valid
response was received from the peer then returns a
:class:`~pydicom.dataset.Dataset` containing at least a
(0000,0900) *Status* element, and, depending on the returned
*Status* value, may optionally contain additional elements (see
DICOM Standard, Part 7, :dcm:`Annex C<part07/chapter_C.html>`).
The DICOM Standard, Part 7, :dcm:`Table 9.3-13
<part07/sect_9.3.5.2.html>` indicates that the *Status*
value of a C-ECHO response "shall have a value of Success". However
:dcm:`Section 9.1.5.1.4<part07/chapter_9.html#sect_9.1.5.1.4>`
indicates it may have any of the following
values:
Success
| ``0x0000`` - Success
Failure
| ``0x0122`` - SOP class not supported
| ``0x0210`` - Duplicate invocation
| ``0x0211`` - Unrecognised operation
| ``0x0212`` - Mistyped argument
As the actual status depends on the peer SCP, it shouldn't be
assumed that it will be one of these.
Raises
------
RuntimeError
If called without an association to a peer SCP.
ValueError
If the association has no accepted presentation context for
*Verification SOP Class*.
See Also
--------
:class:`~pynetdicom.dimse_primitives.C_ECHO`
:class:`~pynetdicom.service_class.VerificationServiceClass`
References
----------
* DICOM Standard, Part 4, :dcm:`Annex A<part04/chapter_A.html>`
* DICOM Standard, Part 7, Sections
:dcm:`9.1.5<part07/chapter_9.html#sect_9.1.5>`,
:dcm:`9.3.5<part07/sect_9.3.5.html>`, and
:dcm:`Annex C<part07/chapter_C.html>`
"""