27 pynetdicom
I’ll show you how to implement DICOM operations using pynetdicom, which is the most comprehensive Python library for DICOM networking. Since you prefer Python and work with medical imaging at Ramathibodi Hospital, these examples will be directly applicable to your work.
27.1 Installation and Setup
pip install pynetdicom pydicom27.2 C-ECHO: Testing Connectivity
The simplest operation to verify your DICOM connection:
from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import Verification
def perform_echo(server_ip, server_port, server_ae_title, client_ae_title):
"""
Perform C-ECHO to test DICOM connectivity
"""
# Create Application Entity
ae = AE(ae_title=client_ae_title)
# Add requested presentation context
ae.add_requested_context(Verification)
# Associate with peer AE
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
if assoc.is_established:
print(f"Association established with {server_ae_title}")
# Send C-ECHO request
status = assoc.send_c_echo()
if status:
# Check the status of the response
print(f'C-ECHO Response: 0x{status.Status:04X}')
if status.Status == 0x0000:
print("C-ECHO successful!")
result = True
else:
print(f"C-ECHO failed with status: {status.Status}")
result = False
else:
print("No response received")
result = False
# Release association
assoc.release()
else:
print("Association rejected, aborted or never connected")
result = False
return result
# Usage
success = perform_echo('192.168.1.100', 104, 'PACS_SERVER', 'MY_WORKSTATION')27.3 C-STORE SCU: Sending Images
Send DICOM files to a PACS server:
from pynetdicom import AE, StoragePresentationContexts
from pydicom import dcmread
import os
def send_dicom_file(file_path, server_ip, server_port,
server_ae_title, client_ae_title):
"""
Send a single DICOM file using C-STORE
"""
# Read the DICOM file
ds = dcmread(file_path)
# Create Application Entity with all storage contexts
ae = AE(ae_title=client_ae_title)
ae.requested_contexts = StoragePresentationContexts
# Associate with the peer AE
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
if assoc.is_established:
print(f"Association established with {server_ae_title}")
# Send C-STORE request
status = assoc.send_c_store(ds)
if status:
print(f'C-STORE Response Status: 0x{status.Status:04X}')
if status.Status == 0x0000:
print(f"Successfully stored: {ds.SOPInstanceUID}")
else:
print(f"Store failed with status: 0x{status.Status:04X}")
assoc.release()
else:
print("Association rejected or aborted")
def send_multiple_dicom_files(file_paths, server_ip, server_port,
server_ae_title, client_ae_title):
"""
Send multiple DICOM files in a single association
"""
ae = AE(ae_title=client_ae_title)
ae.requested_contexts = StoragePresentationContexts
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
if assoc.is_established:
print(f"Sending {len(file_paths)} files...")
success_count = 0
failure_count = 0
for file_path in file_paths:
try:
ds = dcmread(file_path)
status = assoc.send_c_store(ds)
if status and status.Status == 0x0000:
success_count += 1
print(f"✓ {os.path.basename(file_path)}")
else:
failure_count += 1
print(f"✗ {os.path.basename(file_path)}: "
f"0x{status.Status:04X}")
except Exception as e:
failure_count += 1
print(f"✗ {os.path.basename(file_path)}: {str(e)}")
print(f"\nSummary: {success_count} successful, {failure_count} failed")
assoc.release()
else:
print("Association failed")
# Send entire directory
def send_directory(directory_path, server_ip, server_port,
server_ae_title, client_ae_title):
"""
Send all DICOM files in a directory
"""
dicom_files = []
for root, dirs, files in os.walk(directory_path):
for file in files:
if file.endswith('.dcm'):
dicom_files.append(os.path.join(root, file))
if dicom_files:
print(f"Found {len(dicom_files)} DICOM files")
send_multiple_dicom_files(dicom_files, server_ip, server_port,
server_ae_title, client_ae_title)
else:
print("No DICOM files found")27.4 C-STORE SCP: Receiving Images
Create a DICOM storage server to receive images:
from pynetdicom import AE, evt, AllStoragePresentationContexts
from pydicom.dataset import Dataset
import os
from datetime import datetime
class StorageHandler:
"""Handler for C-STORE requests"""
def __init__(self, storage_dir='DicomStorage'):
self.storage_dir = storage_dir
self.received_count = 0
def handle_store(self, event):
"""
Handle C-STORE request events
"""
# Get the dataset
ds = event.dataset
# Add file meta information
ds.file_meta = event.file_meta
# Get study, series, and SOP UIDs for directory structure
study_uid = ds.StudyInstanceUID
series_uid = ds.SeriesInstanceUID
sop_uid = ds.SOPInstanceUID
# Create directory structure: StudyUID/SeriesUID/
path = os.path.join(self.storage_dir, study_uid, series_uid)
os.makedirs(path, exist_ok=True)
# Save the file
filename = os.path.join(path, f'{sop_uid}.dcm')
ds.save_as(filename, write_like_original=False)
self.received_count += 1
# Log the received file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Received: "
f"{ds.PatientName} - {ds.Modality} - {ds.StudyDescription}")
print(f" Saved to: {filename}")
# Return success status
return 0x0000
def start_storage_scp(port=11112, ae_title='STORAGE_SCP'):
"""
Start a DICOM Storage SCP server
"""
# Create handler instance
handler = StorageHandler()
# Create Application Entity
ae = AE(ae_title=ae_title)
# Accept all storage presentation contexts
ae.supported_contexts = AllStoragePresentationContexts
# Set up handlers
handlers = [
(evt.EVT_C_STORE, handler.handle_store),
(evt.EVT_ACCEPTED, handle_accepted),
(evt.EVT_RELEASED, handle_released),
]
print(f"Starting Storage SCP on port {port}")
print(f"AE Title: {ae_title}")
print("Press Ctrl+C to stop\n")
# Start listening for associations
ae.start_server(('', port), evt_handlers=handlers, block=True)
def handle_accepted(event):
"""Handle association accepted event"""
print(f"Association accepted from {event.assoc.remote.ae_title}")
def handle_released(event):
"""Handle association released event"""
print(f"Association released")
# Run the server
if __name__ == "__main__":
start_storage_scp(port=11112, ae_title='PYNETDICOM_SCP')27.5 C-FIND: Querying for Studies
Query PACS for patient studies, series, or images:
from pynetdicom import AE
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelFind,
StudyRootQueryRetrieveInformationModelFind
)
from pydicom.dataset import Dataset
def find_studies(patient_id=None, patient_name=None, study_date=None,
modality=None, server_ip='localhost', server_port=104,
server_ae_title='PACS', client_ae_title='PYNETDICOM'):
"""
Query for studies using C-FIND
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
# Patient level
ds.PatientID = patient_id if patient_id else '*'
ds.PatientName = patient_name if patient_name else ''
ds.PatientBirthDate = ''
ds.PatientSex = ''
# Study level
ds.StudyInstanceUID = ''
ds.StudyDate = study_date if study_date else ''
ds.StudyTime = ''
ds.StudyDescription = ''
ds.AccessionNumber = ''
ds.StudyID = ''
ds.ModalitiesInStudy = modality if modality else ''
ds.NumberOfStudyRelatedSeries = ''
ds.NumberOfStudyRelatedInstances = ''
# Create Application Entity
ae = AE(ae_title=client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
# Associate with peer
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
studies = []
if assoc.is_established:
print(f"Querying {server_ae_title} for studies...")
# Send C-FIND request
responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
for (status, dataset) in responses:
if status:
if status.Status in (0xFF00, 0xFF01): # Pending
if dataset:
studies.append(dataset)
print(f"Found: {dataset.PatientName} - "
f"{dataset.StudyDate} - "
f"{dataset.StudyDescription}")
else:
print(f'C-FIND completed with status: 0x{status.Status:04X}')
assoc.release()
print(f"\nTotal studies found: {len(studies)}")
else:
print("Association rejected")
return studies
def find_series(study_instance_uid, server_ip='localhost', server_port=104,
server_ae_title='PACS', client_ae_title='PYNETDICOM'):
"""
Query for series within a study
"""
ds = Dataset()
ds.QueryRetrieveLevel = 'SERIES'
# Study level (must include)
ds.StudyInstanceUID = study_instance_uid
# Series level
ds.SeriesInstanceUID = ''
ds.SeriesNumber = ''
ds.SeriesDescription = ''
ds.Modality = ''
ds.SeriesDate = ''
ds.SeriesTime = ''
ds.NumberOfSeriesRelatedInstances = ''
ae = AE(ae_title=client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
series_list = []
if assoc.is_established:
responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
series_list.append(dataset)
print(f"Series {dataset.SeriesNumber}: "
f"{dataset.Modality} - {dataset.SeriesDescription} "
f"({dataset.NumberOfSeriesRelatedInstances} images)")
assoc.release()
return series_list
# Advanced query with date range
def find_studies_date_range(start_date, end_date, modality=None,
server_ip='localhost', server_port=104,
server_ae_title='PACS', client_ae_title='PYNETDICOM'):
"""
Find studies within a date range
Date format: YYYYMMDD
"""
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
# Patient level
ds.PatientName = ''
ds.PatientID = ''
# Study level with date range
ds.StudyInstanceUID = ''
ds.StudyDate = f'{start_date}-{end_date}' # Range matching
ds.StudyDescription = ''
ds.ModalitiesInStudy = modality if modality else ''
ae = AE(ae_title=client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
studies = []
if assoc.is_established:
responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append(dataset)
assoc.release()
return studies27.6 C-MOVE: Requesting Transfer to Another AE
C-MOVE instructs the server to send images to a destination:
from pynetdicom import AE
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelMove,
StudyRootQueryRetrieveInformationModelMove
)
from pydicom.dataset import Dataset
def move_study(study_uid, destination_ae, server_ip='localhost',
server_port=104, server_ae_title='PACS',
client_ae_title='PYNETDICOM'):
"""
Request C-MOVE to send study to destination AE
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.StudyInstanceUID = study_uid
# Create Application Entity
ae = AE(ae_title=client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
# Associate with peer
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
if assoc.is_established:
print(f"Requesting move of study {study_uid} to {destination_ae}")
# Send C-MOVE request
responses = assoc.send_c_move(
ds,
destination_ae,
PatientRootQueryRetrieveInformationModelMove
)
for (status, dataset) in responses:
if status:
# Check if pending (0xFF00)
if status.Status == 0xFF00:
print(f"Progress: {status.NumberOfCompletedSuboperations}/"
f"{status.NumberOfCompletedSuboperations + "
f"status.NumberOfRemainingSuboperations}")
elif status.Status == 0x0000:
print(f"C-MOVE completed successfully")
print(f" Completed: {status.NumberOfCompletedSuboperations}")
print(f" Failed: {status.NumberOfFailedSuboperations}")
print(f" Warnings: {status.NumberOfWarningSuboperations}")
else:
print(f"C-MOVE failed with status: 0x{status.Status:04X}")
assoc.release()
else:
print("Association rejected")
def move_series(study_uid, series_uid, destination_ae,
server_ip='localhost', server_port=104,
server_ae_title='PACS', client_ae_title='PYNETDICOM'):
"""
Move specific series
"""
ds = Dataset()
ds.QueryRetrieveLevel = 'SERIES'
ds.StudyInstanceUID = study_uid
ds.SeriesInstanceUID = series_uid
ae = AE(ae_title=client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
if assoc.is_established:
responses = assoc.send_c_move(
ds,
destination_ae,
PatientRootQueryRetrieveInformationModelMove
)
for (status, dataset) in responses:
if status:
if status.Status == 0xFF00:
remaining = status.NumberOfRemainingSuboperations
completed = status.NumberOfCompletedSuboperations
print(f"Moving... {completed}/{remaining + completed}")
elif status.Status == 0x0000:
print("Move completed successfully")
assoc.release()
# Move with progress callback
class MoveHandler:
def __init__(self):
self.total = 0
self.completed = 0
def handle_move_response(self, status, dataset):
"""Handle C-MOVE responses with progress tracking"""
if status.Status == 0xFF00: # Pending
self.completed = status.NumberOfCompletedSuboperations
self.total = (status.NumberOfCompletedSuboperations +
status.NumberOfRemainingSuboperations)
if self.total > 0:
progress = (self.completed / self.total) * 100
print(f"Progress: {progress:.1f}% ({self.completed}/{self.total})")
elif status.Status == 0x0000: # Success
print(f"✓ Move completed: {self.completed} images transferred")
else:
print(f"✗ Move failed: 0x{status.Status:04X}")27.7 C-GET: Direct Retrieval
C-GET retrieves images directly over the same association:
from pynetdicom import AE, evt
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelGet,
CTImageStorage, MRImageStorage
)
from pydicom.dataset import Dataset
import os
class GetHandler:
"""Handler for incoming C-STORE sub-operations during C-GET"""
def __init__(self, storage_dir='Downloads'):
self.storage_dir = storage_dir
self.received_files = []
def handle_store(self, event):
"""Handle incoming C-STORE from C-GET"""
ds = event.dataset
ds.file_meta = event.file_meta
# Create storage directory
os.makedirs(self.storage_dir, exist_ok=True)
# Save file
filename = os.path.join(self.storage_dir, f'{ds.SOPInstanceUID}.dcm')
ds.save_as(filename, write_like_original=False)
self.received_files.append(filename)
print(f" Received: {os.path.basename(filename)}")
return 0x0000 # Success
def get_study(study_uid, server_ip='localhost', server_port=104,
server_ae_title='PACS', client_ae_title='PYNETDICOM'):
"""
Retrieve study using C-GET
"""
# Create handler
handler = GetHandler()
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.StudyInstanceUID = study_uid
# Create Application Entity
ae = AE(ae_title=client_ae_title)
# Add presentation contexts for C-GET
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
# Add storage contexts for incoming C-STORE
storage_classes = [
CTImageStorage,
MRImageStorage,
# Add more storage classes as needed
]
for storage_class in storage_classes:
ae.add_requested_context(storage_class)
# Set up handlers
handlers = [(evt.EVT_C_STORE, handler.handle_store)]
# Associate with peer
assoc = ae.associate(
server_ip,
server_port,
ae_title=server_ae_title,
evt_handlers=handlers
)
if assoc.is_established:
print(f"Retrieving study {study_uid} using C-GET...")
# Send C-GET request
responses = assoc.send_c_get(
ds,
PatientRootQueryRetrieveInformationModelGet
)
for (status, dataset) in responses:
if status:
if status.Status == 0xFF00: # Pending
completed = status.NumberOfCompletedSuboperations
remaining = status.NumberOfRemainingSuboperations
total = completed + remaining
print(f"Progress: {completed}/{total}")
elif status.Status == 0x0000: # Success
print(f"C-GET completed successfully")
print(f"Retrieved {len(handler.received_files)} files")
else:
print(f"C-GET failed: 0x{status.Status:04X}")
assoc.release()
else:
print("Association rejected")
return handler.received_files27.8 Complete Workflow Example
Here’s a practical workflow combining multiple operations:
from pynetdicom import AE, debug_logger, evt
from pynetdicom.sop_class import *
from pydicom.dataset import Dataset
import os
from datetime import datetime, timedelta
class DicomWorkflow:
"""Complete DICOM workflow handler"""
def __init__(self, server_ip, server_port, server_ae_title,
client_ae_title='PYNETDICOM'):
self.server_ip = server_ip
self.server_port = server_port
self.server_ae_title = server_ae_title
self.client_ae_title = client_ae_title
def test_connection(self):
"""Test connectivity with C-ECHO"""
ae = AE(ae_title=self.client_ae_title)
ae.add_requested_context(Verification)
assoc = ae.associate(self.server_ip, self.server_port,
ae_title=self.server_ae_title)
if assoc.is_established:
status = assoc.send_c_echo()
assoc.release()
return status and status.Status == 0x0000
return False
def find_today_studies(self, modality=None):
"""Find all studies from today"""
today = datetime.now().strftime('%Y%m%d')
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.PatientName = ''
ds.PatientID = ''
ds.StudyInstanceUID = ''
ds.StudyDate = today
ds.StudyDescription = ''
ds.ModalitiesInStudy = modality if modality else ''
ds.NumberOfStudyRelatedInstances = ''
ae = AE(ae_title=self.client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
studies = []
assoc = ae.associate(self.server_ip, self.server_port,
ae_title=self.server_ae_title)
if assoc.is_established:
responses = assoc.send_c_find(
ds,
PatientRootQueryRetrieveInformationModelFind
)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append({
'PatientName': str(dataset.PatientName),
'PatientID': dataset.PatientID,
'StudyUID': dataset.StudyInstanceUID,
'StudyDescription': dataset.StudyDescription,
'Modality': dataset.ModalitiesInStudy,
'NumImages': dataset.NumberOfStudyRelatedInstances
})
assoc.release()
return studies
def retrieve_and_process_study(self, study_uid, process_func=None):
"""
Retrieve a study and optionally process each image
"""
# Set up temporary storage SCP
storage_port = 11113
storage_ae = 'TEMP_SCP'
received_files = []
def handle_store(event):
"""Handle incoming C-STORE"""
ds = event.dataset
ds.file_meta = event.file_meta
# Save file
os.makedirs('temp_storage', exist_ok=True)
filepath = f'temp_storage/{ds.SOPInstanceUID}.dcm'
ds.save_as(filepath, write_like_original=False)
received_files.append(filepath)
# Process if function provided
if process_func:
process_func(ds)
return 0x0000
# Start storage SCP in thread
import threading
def run_scp():
ae = AE(ae_title=storage_ae)
ae.supported_contexts = AllStoragePresentationContexts
handlers = [(evt.EVT_C_STORE, handle_store)]
ae.start_server(('', storage_port), evt_handlers=handlers,
block=False)
scp_thread = threading.Thread(target=run_scp)
scp_thread.daemon = True
scp_thread.start()
# Wait for SCP to start
import time
time.sleep(2)
# Request C-MOVE
self.move_study(study_uid, storage_ae)
return received_files
def create_worklist(self, patient_id):
"""Create a worklist for a specific patient"""
print(f"\n{'='*60}")
print(f"WORKLIST FOR PATIENT: {patient_id}")
print(f"{'='*60}\n")
# Find studies
studies = self.find_studies_for_patient(patient_id)
if not studies:
print("No studies found")
return
# Display worklist
for i, study in enumerate(studies, 1):
print(f"{i}. Study Date: {study['StudyDate']}")
print(f" Description: {study['StudyDescription']}")
print(f" Modality: {study['Modality']}")
print(f" Images: {study['NumImages']}")
print(f" Study UID: {study['StudyUID'][:30]}...")
print()
return studies
def find_studies_for_patient(self, patient_id):
"""Find all studies for a patient"""
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.PatientID = patient_id
ds.PatientName = ''
ds.StudyInstanceUID = ''
ds.StudyDate = ''
ds.StudyDescription = ''
ds.ModalitiesInStudy = ''
ds.NumberOfStudyRelatedInstances = ''
ae = AE(ae_title=self.client_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
studies = []
assoc = ae.associate(self.server_ip, self.server_port,
ae_title=self.server_ae_title)
if assoc.is_established:
responses = assoc.send_c_find(
ds,
PatientRootQueryRetrieveInformationModelFind
)
for (status, dataset) in responses:
if status and status.Status in (0xFF00, 0xFF01) and dataset:
studies.append({
'PatientName': str(dataset.PatientName),
'StudyUID': dataset.StudyInstanceUID,
'StudyDate': dataset.StudyDate,
'StudyDescription': dataset.StudyDescription,
'Modality': dataset.ModalitiesInStudy,
'NumImages': dataset.NumberOfStudyRelatedInstances
})
assoc.release()
return studies
# Example usage
if __name__ == "__main__":
# Initialize workflow
workflow = DicomWorkflow(
server_ip='192.168.1.100',
server_port=104,
server_ae_title='PACS_SERVER',
client_ae_title='PYTHON_CLIENT'
)
# Test connection
if workflow.test_connection():
print("✓ Connection successful")
# Find today's CT studies
studies = workflow.find_today_studies(modality='CT')
print(f"Found {len(studies)} CT studies today")
# Create worklist for specific patient
workflow.create_worklist('12345678')
else:
print("✗ Connection failed")27.9 Advanced Features
27.9.1 Association Configuration
from pynetdicom import AE, build_context
from pynetdicom.presentation import PresentationContext
def create_custom_ae():
"""Create AE with custom configuration"""
ae = AE(ae_title='CUSTOM_AE')
# Configure network settings
ae.acse_timeout = 30 # Association timeout
ae.dimse_timeout = 30 # DIMSE message timeout
ae.network_timeout = 30 # Network timeout
ae.maximum_pdu_size = 16384 # Max PDU size
# Add custom presentation contexts with transfer syntaxes
from pydicom.uid import (
ImplicitVRLittleEndian,
ExplicitVRLittleEndian,
JPEGBaseline,
JPEG2000
)
transfer_syntaxes = [
ImplicitVRLittleEndian,
ExplicitVRLittleEndian,
JPEGBaseline,
JPEG2000
]
# Add CT Image Storage with multiple transfer syntaxes
context = build_context(CTImageStorage, transfer_syntaxes)
ae.add_requested_context(context)
return ae27.9.2 Error Handling and Logging
import logging
from pynetdicom import debug_logger
# Enable debug logging
debug_logger()
# Or configure custom logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('pynetdicom')
def robust_operation(func, *args, max_retries=3, **kwargs):
"""Wrapper for robust DICOM operations with retry"""
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
return result
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoffThese Python examples with pynetdicom should give you a complete toolkit for implementing DICOM operations in your radiology AI unit at Ramathibodi Hospital. The library is particularly well-suited for research and custom applications where you need fine-grained control over DICOM communications.