27 pynetdicom
I’ll show you how to implement DICOM operations using pynetdicom, which is the most comprehensive Python library for DICOM networking. Since you prefer Python and work with medical imaging at Ramathibodi Hospital, these examples will be directly applicable to your work.
27.1 Installation and Setup
pip install pynetdicom pydicom
27.2 C-ECHO: Testing Connectivity
The simplest operation to verify your DICOM connection:
from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import Verification
def perform_echo(server_ip, server_port, server_ae_title, client_ae_title):
"""
Perform C-ECHO to test DICOM connectivity
"""
# Create Application Entity
= AE(ae_title=client_ae_title)
ae
# Add requested presentation context
ae.add_requested_context(Verification)
# Associate with peer AE
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
if assoc.is_established:
print(f"Association established with {server_ae_title}")
# Send C-ECHO request
= assoc.send_c_echo()
status
if status:
# Check the status of the response
print(f'C-ECHO Response: 0x{status.Status:04X}')
if status.Status == 0x0000:
print("C-ECHO successful!")
= True
result else:
print(f"C-ECHO failed with status: {status.Status}")
= False
result else:
print("No response received")
= False
result
# Release association
assoc.release()else:
print("Association rejected, aborted or never connected")
= False
result
return result
# Usage
= perform_echo('192.168.1.100', 104, 'PACS_SERVER', 'MY_WORKSTATION') success
27.3 C-STORE SCU: Sending Images
Send DICOM files to a PACS server:
from pynetdicom import AE, StoragePresentationContexts
from pydicom import dcmread
import os
def send_dicom_file(file_path, server_ip, server_port,
server_ae_title, client_ae_title):"""
Send a single DICOM file using C-STORE
"""
# Read the DICOM file
= dcmread(file_path)
ds
# Create Application Entity with all storage contexts
= AE(ae_title=client_ae_title)
ae = StoragePresentationContexts
ae.requested_contexts
# Associate with the peer AE
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
if assoc.is_established:
print(f"Association established with {server_ae_title}")
# Send C-STORE request
= assoc.send_c_store(ds)
status
if status:
print(f'C-STORE Response Status: 0x{status.Status:04X}')
if status.Status == 0x0000:
print(f"Successfully stored: {ds.SOPInstanceUID}")
else:
print(f"Store failed with status: 0x{status.Status:04X}")
assoc.release()else:
print("Association rejected or aborted")
def send_multiple_dicom_files(file_paths, server_ip, server_port,
server_ae_title, client_ae_title):"""
Send multiple DICOM files in a single association
"""
= AE(ae_title=client_ae_title)
ae = StoragePresentationContexts
ae.requested_contexts
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
if assoc.is_established:
print(f"Sending {len(file_paths)} files...")
= 0
success_count = 0
failure_count
for file_path in file_paths:
try:
= dcmread(file_path)
ds = assoc.send_c_store(ds)
status
if status and status.Status == 0x0000:
+= 1
success_count print(f"✓ {os.path.basename(file_path)}")
else:
+= 1
failure_count print(f"✗ {os.path.basename(file_path)}: "
f"0x{status.Status:04X}")
except Exception as e:
+= 1
failure_count print(f"✗ {os.path.basename(file_path)}: {str(e)}")
print(f"\nSummary: {success_count} successful, {failure_count} failed")
assoc.release()else:
print("Association failed")
# Send entire directory
def send_directory(directory_path, server_ip, server_port,
server_ae_title, client_ae_title):"""
Send all DICOM files in a directory
"""
= []
dicom_files for root, dirs, files in os.walk(directory_path):
for file in files:
if file.endswith('.dcm'):
file))
dicom_files.append(os.path.join(root,
if dicom_files:
print(f"Found {len(dicom_files)} DICOM files")
send_multiple_dicom_files(dicom_files, server_ip, server_port,
server_ae_title, client_ae_title)else:
print("No DICOM files found")
27.4 C-STORE SCP: Receiving Images
Create a DICOM storage server to receive images:
from pynetdicom import AE, evt, AllStoragePresentationContexts
from pydicom.dataset import Dataset
import os
from datetime import datetime
class StorageHandler:
"""Handler for C-STORE requests"""
def __init__(self, storage_dir='DicomStorage'):
self.storage_dir = storage_dir
self.received_count = 0
def handle_store(self, event):
"""
Handle C-STORE request events
"""
# Get the dataset
= event.dataset
ds
# Add file meta information
= event.file_meta
ds.file_meta
# Get study, series, and SOP UIDs for directory structure
= ds.StudyInstanceUID
study_uid = ds.SeriesInstanceUID
series_uid = ds.SOPInstanceUID
sop_uid
# Create directory structure: StudyUID/SeriesUID/
= os.path.join(self.storage_dir, study_uid, series_uid)
path =True)
os.makedirs(path, exist_ok
# Save the file
= os.path.join(path, f'{sop_uid}.dcm')
filename =False)
ds.save_as(filename, write_like_original
self.received_count += 1
# Log the received file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Received: "
f"{ds.PatientName} - {ds.Modality} - {ds.StudyDescription}")
print(f" Saved to: {filename}")
# Return success status
return 0x0000
def start_storage_scp(port=11112, ae_title='STORAGE_SCP'):
"""
Start a DICOM Storage SCP server
"""
# Create handler instance
= StorageHandler()
handler
# Create Application Entity
= AE(ae_title=ae_title)
ae
# Accept all storage presentation contexts
= AllStoragePresentationContexts
ae.supported_contexts
# Set up handlers
= [
handlers
(evt.EVT_C_STORE, handler.handle_store),
(evt.EVT_ACCEPTED, handle_accepted),
(evt.EVT_RELEASED, handle_released),
]
print(f"Starting Storage SCP on port {port}")
print(f"AE Title: {ae_title}")
print("Press Ctrl+C to stop\n")
# Start listening for associations
'', port), evt_handlers=handlers, block=True)
ae.start_server((
def handle_accepted(event):
"""Handle association accepted event"""
print(f"Association accepted from {event.assoc.remote.ae_title}")
def handle_released(event):
"""Handle association released event"""
print(f"Association released")
# Run the server
if __name__ == "__main__":
=11112, ae_title='PYNETDICOM_SCP') start_storage_scp(port
27.5 C-FIND: Querying for Studies
Query PACS for patient studies, series, or images:
from pynetdicom import AE
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelFind,
StudyRootQueryRetrieveInformationModelFind
)from pydicom.dataset import Dataset
def find_studies(patient_id=None, patient_name=None, study_date=None,
=None, server_ip='localhost', server_port=104,
modality='PACS', client_ae_title='PYNETDICOM'):
server_ae_title"""
Query for studies using C-FIND
"""
# Create query dataset
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel
# Patient level
= patient_id if patient_id else '*'
ds.PatientID = patient_name if patient_name else ''
ds.PatientName = ''
ds.PatientBirthDate = ''
ds.PatientSex
# Study level
= ''
ds.StudyInstanceUID = study_date if study_date else ''
ds.StudyDate = ''
ds.StudyTime = ''
ds.StudyDescription = ''
ds.AccessionNumber = ''
ds.StudyID = modality if modality else ''
ds.ModalitiesInStudy = ''
ds.NumberOfStudyRelatedSeries = ''
ds.NumberOfStudyRelatedInstances
# Create Application Entity
= AE(ae_title=client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
# Associate with peer
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
= []
studies
if assoc.is_established:
print(f"Querying {server_ae_title} for studies...")
# Send C-FIND request
= assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
responses
for (status, dataset) in responses:
if status:
if status.Status in (0xFF00, 0xFF01): # Pending
if dataset:
studies.append(dataset)print(f"Found: {dataset.PatientName} - "
f"{dataset.StudyDate} - "
f"{dataset.StudyDescription}")
else:
print(f'C-FIND completed with status: 0x{status.Status:04X}')
assoc.release()print(f"\nTotal studies found: {len(studies)}")
else:
print("Association rejected")
return studies
def find_series(study_instance_uid, server_ip='localhost', server_port=104,
='PACS', client_ae_title='PYNETDICOM'):
server_ae_title"""
Query for series within a study
"""
= Dataset()
ds = 'SERIES'
ds.QueryRetrieveLevel
# Study level (must include)
= study_instance_uid
ds.StudyInstanceUID
# Series level
= ''
ds.SeriesInstanceUID = ''
ds.SeriesNumber = ''
ds.SeriesDescription = ''
ds.Modality = ''
ds.SeriesDate = ''
ds.SeriesTime = ''
ds.NumberOfSeriesRelatedInstances
= AE(ae_title=client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
= []
series_list
if assoc.is_established:
= assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
responses
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
series_list.append(dataset)print(f"Series {dataset.SeriesNumber}: "
f"{dataset.Modality} - {dataset.SeriesDescription} "
f"({dataset.NumberOfSeriesRelatedInstances} images)")
assoc.release()
return series_list
# Advanced query with date range
def find_studies_date_range(start_date, end_date, modality=None,
='localhost', server_port=104,
server_ip='PACS', client_ae_title='PYNETDICOM'):
server_ae_title"""
Find studies within a date range
Date format: YYYYMMDD
"""
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel
# Patient level
= ''
ds.PatientName = ''
ds.PatientID
# Study level with date range
= ''
ds.StudyInstanceUID = f'{start_date}-{end_date}' # Range matching
ds.StudyDate = ''
ds.StudyDescription = modality if modality else ''
ds.ModalitiesInStudy
= AE(ae_title=client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
= []
studies if assoc.is_established:
= assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
responses
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append(dataset)
assoc.release()
return studies
27.6 C-MOVE: Requesting Transfer to Another AE
C-MOVE instructs the server to send images to a destination:
from pynetdicom import AE
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelMove,
StudyRootQueryRetrieveInformationModelMove
)from pydicom.dataset import Dataset
def move_study(study_uid, destination_ae, server_ip='localhost',
=104, server_ae_title='PACS',
server_port='PYNETDICOM'):
client_ae_title"""
Request C-MOVE to send study to destination AE
"""
# Create query dataset
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel = study_uid
ds.StudyInstanceUID
# Create Application Entity
= AE(ae_title=client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
# Associate with peer
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
if assoc.is_established:
print(f"Requesting move of study {study_uid} to {destination_ae}")
# Send C-MOVE request
= assoc.send_c_move(
responses
ds,
destination_ae,
PatientRootQueryRetrieveInformationModelMove
)
for (status, dataset) in responses:
if status:
# Check if pending (0xFF00)
if status.Status == 0xFF00:
print(f"Progress: {status.NumberOfCompletedSuboperations}/"
f"{status.NumberOfCompletedSuboperations + "
f"status.NumberOfRemainingSuboperations}")
elif status.Status == 0x0000:
print(f"C-MOVE completed successfully")
print(f" Completed: {status.NumberOfCompletedSuboperations}")
print(f" Failed: {status.NumberOfFailedSuboperations}")
print(f" Warnings: {status.NumberOfWarningSuboperations}")
else:
print(f"C-MOVE failed with status: 0x{status.Status:04X}")
.release()
assocelse:
print("Association rejected")
def move_series(study_uid, series_uid, destination_ae,
='localhost', server_port=104,
server_ip='PACS', client_ae_title='PYNETDICOM'):
server_ae_title"""
Move specific series
"""
= Dataset()
ds .QueryRetrieveLevel = 'SERIES'
ds.StudyInstanceUID = study_uid
ds.SeriesInstanceUID = series_uid
ds
= AE(ae_title=client_ae_title)
ae .add_requested_context(PatientRootQueryRetrieveInformationModelMove)
ae
= ae.associate(server_ip, server_port, ae_title=server_ae_title)
assoc
if assoc.is_established:
= assoc.send_c_move(
responses
ds,
destination_ae,
PatientRootQueryRetrieveInformationModelMove
)
for (status, dataset) in responses:
if status:
if status.Status == 0xFF00:
= status.NumberOfRemainingSuboperations
remaining = status.NumberOfCompletedSuboperations
completed print(f"Moving... {completed}/{remaining + completed}")
elif status.Status == 0x0000:
print("Move completed successfully")
.release()
assoc
# Move with progress callback
class MoveHandler:
def __init__(self):
self.total = 0
self.completed = 0
def handle_move_response(self, status, dataset):
"""Handle C-MOVE responses with progress tracking"""
if status.Status == 0xFF00: # Pending
self.completed = status.NumberOfCompletedSuboperations
self.total = (status.NumberOfCompletedSuboperations +
status.NumberOfRemainingSuboperations)
if self.total > 0:
= (self.completed / self.total) * 100
progress print(f"Progress: {progress:.1f}% ({self.completed}/{self.total})")
elif status.Status == 0x0000: # Success
print(f"✓ Move completed: {self.completed} images transferred")
else:
print(f"✗ Move failed: 0x{status.Status:04X}")
27.7 C-GET: Direct Retrieval
C-GET retrieves images directly over the same association:
from pynetdicom import AE, evt
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelGet,
CTImageStorage, MRImageStorage
)from pydicom.dataset import Dataset
import os
class GetHandler:
"""Handler for incoming C-STORE sub-operations during C-GET"""
def __init__(self, storage_dir='Downloads'):
self.storage_dir = storage_dir
self.received_files = []
def handle_store(self, event):
"""Handle incoming C-STORE from C-GET"""
= event.dataset
ds = event.file_meta
ds.file_meta
# Create storage directory
self.storage_dir, exist_ok=True)
os.makedirs(
# Save file
= os.path.join(self.storage_dir, f'{ds.SOPInstanceUID}.dcm')
filename =False)
ds.save_as(filename, write_like_original
self.received_files.append(filename)
print(f" Received: {os.path.basename(filename)}")
return 0x0000 # Success
def get_study(study_uid, server_ip='localhost', server_port=104,
='PACS', client_ae_title='PYNETDICOM'):
server_ae_title"""
Retrieve study using C-GET
"""
# Create handler
= GetHandler()
handler
# Create query dataset
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel = study_uid
ds.StudyInstanceUID
# Create Application Entity
= AE(ae_title=client_ae_title)
ae
# Add presentation contexts for C-GET
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
# Add storage contexts for incoming C-STORE
= [
storage_classes
CTImageStorage,
MRImageStorage,# Add more storage classes as needed
]for storage_class in storage_classes:
ae.add_requested_context(storage_class)
# Set up handlers
= [(evt.EVT_C_STORE, handler.handle_store)]
handlers
# Associate with peer
= ae.associate(
assoc
server_ip,
server_port, =server_ae_title,
ae_title=handlers
evt_handlers
)
if assoc.is_established:
print(f"Retrieving study {study_uid} using C-GET...")
# Send C-GET request
= assoc.send_c_get(
responses
ds,
PatientRootQueryRetrieveInformationModelGet
)
for (status, dataset) in responses:
if status:
if status.Status == 0xFF00: # Pending
= status.NumberOfCompletedSuboperations
completed = status.NumberOfRemainingSuboperations
remaining = completed + remaining
total print(f"Progress: {completed}/{total}")
elif status.Status == 0x0000: # Success
print(f"C-GET completed successfully")
print(f"Retrieved {len(handler.received_files)} files")
else:
print(f"C-GET failed: 0x{status.Status:04X}")
assoc.release()else:
print("Association rejected")
return handler.received_files
27.8 Complete Workflow Example
Here’s a practical workflow combining multiple operations:
from pynetdicom import AE, debug_logger, evt
from pynetdicom.sop_class import *
from pydicom.dataset import Dataset
import os
from datetime import datetime, timedelta
class DicomWorkflow:
"""Complete DICOM workflow handler"""
def __init__(self, server_ip, server_port, server_ae_title,
='PYNETDICOM'):
client_ae_titleself.server_ip = server_ip
self.server_port = server_port
self.server_ae_title = server_ae_title
self.client_ae_title = client_ae_title
def test_connection(self):
"""Test connectivity with C-ECHO"""
= AE(ae_title=self.client_ae_title)
ae
ae.add_requested_context(Verification)
= ae.associate(self.server_ip, self.server_port,
assoc =self.server_ae_title)
ae_title
if assoc.is_established:
= assoc.send_c_echo()
status
assoc.release()return status and status.Status == 0x0000
return False
def find_today_studies(self, modality=None):
"""Find all studies from today"""
= datetime.now().strftime('%Y%m%d')
today
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel = ''
ds.PatientName = ''
ds.PatientID = ''
ds.StudyInstanceUID = today
ds.StudyDate = ''
ds.StudyDescription = modality if modality else ''
ds.ModalitiesInStudy = ''
ds.NumberOfStudyRelatedInstances
= AE(ae_title=self.client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
= []
studies = ae.associate(self.server_ip, self.server_port,
assoc =self.server_ae_title)
ae_title
if assoc.is_established:
= assoc.send_c_find(
responses
ds,
PatientRootQueryRetrieveInformationModelFind
)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append({'PatientName': str(dataset.PatientName),
'PatientID': dataset.PatientID,
'StudyUID': dataset.StudyInstanceUID,
'StudyDescription': dataset.StudyDescription,
'Modality': dataset.ModalitiesInStudy,
'NumImages': dataset.NumberOfStudyRelatedInstances
})
assoc.release()
return studies
def retrieve_and_process_study(self, study_uid, process_func=None):
"""
Retrieve a study and optionally process each image
"""
# Set up temporary storage SCP
= 11113
storage_port = 'TEMP_SCP'
storage_ae = []
received_files
def handle_store(event):
"""Handle incoming C-STORE"""
= event.dataset
ds = event.file_meta
ds.file_meta
# Save file
'temp_storage', exist_ok=True)
os.makedirs(= f'temp_storage/{ds.SOPInstanceUID}.dcm'
filepath =False)
ds.save_as(filepath, write_like_original
received_files.append(filepath)
# Process if function provided
if process_func:
process_func(ds)
return 0x0000
# Start storage SCP in thread
import threading
def run_scp():
= AE(ae_title=storage_ae)
ae = AllStoragePresentationContexts
ae.supported_contexts = [(evt.EVT_C_STORE, handle_store)]
handlers '', storage_port), evt_handlers=handlers,
ae.start_server((=False)
block
= threading.Thread(target=run_scp)
scp_thread = True
scp_thread.daemon
scp_thread.start()
# Wait for SCP to start
import time
2)
time.sleep(
# Request C-MOVE
self.move_study(study_uid, storage_ae)
return received_files
def create_worklist(self, patient_id):
"""Create a worklist for a specific patient"""
print(f"\n{'='*60}")
print(f"WORKLIST FOR PATIENT: {patient_id}")
print(f"{'='*60}\n")
# Find studies
= self.find_studies_for_patient(patient_id)
studies
if not studies:
print("No studies found")
return
# Display worklist
for i, study in enumerate(studies, 1):
print(f"{i}. Study Date: {study['StudyDate']}")
print(f" Description: {study['StudyDescription']}")
print(f" Modality: {study['Modality']}")
print(f" Images: {study['NumImages']}")
print(f" Study UID: {study['StudyUID'][:30]}...")
print()
return studies
def find_studies_for_patient(self, patient_id):
"""Find all studies for a patient"""
= Dataset()
ds = 'STUDY'
ds.QueryRetrieveLevel = patient_id
ds.PatientID = ''
ds.PatientName = ''
ds.StudyInstanceUID = ''
ds.StudyDate = ''
ds.StudyDescription = ''
ds.ModalitiesInStudy = ''
ds.NumberOfStudyRelatedInstances
= AE(ae_title=self.client_ae_title)
ae
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
= []
studies = ae.associate(self.server_ip, self.server_port,
assoc =self.server_ae_title)
ae_title
if assoc.is_established:
= assoc.send_c_find(
responses
ds,
PatientRootQueryRetrieveInformationModelFind
)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append({'PatientName': str(dataset.PatientName),
'StudyUID': dataset.StudyInstanceUID,
'StudyDate': dataset.StudyDate,
'StudyDescription': dataset.StudyDescription,
'Modality': dataset.ModalitiesInStudy,
'NumImages': dataset.NumberOfStudyRelatedInstances
})
assoc.release()
return studies
# Example usage
if __name__ == "__main__":
# Initialize workflow
= DicomWorkflow(
workflow ='192.168.1.100',
server_ip=104,
server_port='PACS_SERVER',
server_ae_title='PYTHON_CLIENT'
client_ae_title
)
# Test connection
if workflow.test_connection():
print("✓ Connection successful")
# Find today's CT studies
= workflow.find_today_studies(modality='CT')
studies print(f"Found {len(studies)} CT studies today")
# Create worklist for specific patient
'12345678')
workflow.create_worklist(else:
print("✗ Connection failed")
27.9 Advanced Features
27.9.1 Association Configuration
from pynetdicom import AE, build_context
from pynetdicom.presentation import PresentationContext
def create_custom_ae():
"""Create AE with custom configuration"""
= AE(ae_title='CUSTOM_AE')
ae
# Configure network settings
= 30 # Association timeout
ae.acse_timeout = 30 # DIMSE message timeout
ae.dimse_timeout = 30 # Network timeout
ae.network_timeout = 16384 # Max PDU size
ae.maximum_pdu_size
# Add custom presentation contexts with transfer syntaxes
from pydicom.uid import (
ImplicitVRLittleEndian,
ExplicitVRLittleEndian,
JPEGBaseline,
JPEG2000
)
= [
transfer_syntaxes
ImplicitVRLittleEndian,
ExplicitVRLittleEndian,
JPEGBaseline,
JPEG2000
]
# Add CT Image Storage with multiple transfer syntaxes
= build_context(CTImageStorage, transfer_syntaxes)
context
ae.add_requested_context(context)
return ae
27.9.2 Error Handling and Logging
import logging
from pynetdicom import debug_logger
# Enable debug logging
debug_logger()
# Or configure custom logging
=logging.INFO)
logging.basicConfig(level= logging.getLogger('pynetdicom')
logger
def robust_operation(func, *args, max_retries=3, **kwargs):
"""Wrapper for robust DICOM operations with retry"""
for attempt in range(max_retries):
try:
= func(*args, **kwargs)
result return result
except Exception as e:
f"Attempt {attempt + 1} failed: {str(e)}")
logger.error(if attempt == max_retries - 1:
raise
2 ** attempt) # Exponential backoff time.sleep(
These Python examples with pynetdicom should give you a complete toolkit for implementing DICOM operations in your radiology AI unit at Ramathibodi Hospital. The library is particularly well-suited for research and custom applications where you need fine-grained control over DICOM communications.