27  pynetdicom

I’ll show you how to implement DICOM operations using pynetdicom, which is the most comprehensive Python library for DICOM networking. Since you prefer Python and work with medical imaging at Ramathibodi Hospital, these examples will be directly applicable to your work.

27.1 Installation and Setup

pip install pynetdicom pydicom

27.2 C-ECHO: Testing Connectivity

The simplest operation to verify your DICOM connection:

from pynetdicom import AE, debug_logger
from pynetdicom.sop_class import Verification

def perform_echo(server_ip, server_port, server_ae_title, client_ae_title):
    """
    Perform C-ECHO to test DICOM connectivity
    """
    # Create Application Entity
    ae = AE(ae_title=client_ae_title)
    
    # Add requested presentation context
    ae.add_requested_context(Verification)
    
    # Associate with peer AE
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    if assoc.is_established:
        print(f"Association established with {server_ae_title}")
        
        # Send C-ECHO request
        status = assoc.send_c_echo()
        
        if status:
            # Check the status of the response
            print(f'C-ECHO Response: 0x{status.Status:04X}')
            if status.Status == 0x0000:
                print("C-ECHO successful!")
                result = True
            else:
                print(f"C-ECHO failed with status: {status.Status}")
                result = False
        else:
            print("No response received")
            result = False
            
        # Release association
        assoc.release()
    else:
        print("Association rejected, aborted or never connected")
        result = False
    
    return result

# Usage
success = perform_echo('192.168.1.100', 104, 'PACS_SERVER', 'MY_WORKSTATION')

27.3 C-STORE SCU: Sending Images

Send DICOM files to a PACS server:

from pynetdicom import AE, StoragePresentationContexts
from pydicom import dcmread
import os

def send_dicom_file(file_path, server_ip, server_port, 
                    server_ae_title, client_ae_title):
    """
    Send a single DICOM file using C-STORE
    """
    # Read the DICOM file
    ds = dcmread(file_path)
    
    # Create Application Entity with all storage contexts
    ae = AE(ae_title=client_ae_title)
    ae.requested_contexts = StoragePresentationContexts
    
    # Associate with the peer AE
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    if assoc.is_established:
        print(f"Association established with {server_ae_title}")
        
        # Send C-STORE request
        status = assoc.send_c_store(ds)
        
        if status:
            print(f'C-STORE Response Status: 0x{status.Status:04X}')
            if status.Status == 0x0000:
                print(f"Successfully stored: {ds.SOPInstanceUID}")
            else:
                print(f"Store failed with status: 0x{status.Status:04X}")
        
        assoc.release()
    else:
        print("Association rejected or aborted")

def send_multiple_dicom_files(file_paths, server_ip, server_port,
                              server_ae_title, client_ae_title):
    """
    Send multiple DICOM files in a single association
    """
    ae = AE(ae_title=client_ae_title)
    ae.requested_contexts = StoragePresentationContexts
    
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    if assoc.is_established:
        print(f"Sending {len(file_paths)} files...")
        
        success_count = 0
        failure_count = 0
        
        for file_path in file_paths:
            try:
                ds = dcmread(file_path)
                status = assoc.send_c_store(ds)
                
                if status and status.Status == 0x0000:
                    success_count += 1
                    print(f"✓ {os.path.basename(file_path)}")
                else:
                    failure_count += 1
                    print(f"✗ {os.path.basename(file_path)}: " 
                          f"0x{status.Status:04X}")
                    
            except Exception as e:
                failure_count += 1
                print(f"✗ {os.path.basename(file_path)}: {str(e)}")
        
        print(f"\nSummary: {success_count} successful, {failure_count} failed")
        assoc.release()
    else:
        print("Association failed")

# Send entire directory
def send_directory(directory_path, server_ip, server_port,
                  server_ae_title, client_ae_title):
    """
    Send all DICOM files in a directory
    """
    dicom_files = []
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith('.dcm'):
                dicom_files.append(os.path.join(root, file))
    
    if dicom_files:
        print(f"Found {len(dicom_files)} DICOM files")
        send_multiple_dicom_files(dicom_files, server_ip, server_port,
                                server_ae_title, client_ae_title)
    else:
        print("No DICOM files found")

27.4 C-STORE SCP: Receiving Images

Create a DICOM storage server to receive images:

from pynetdicom import AE, evt, AllStoragePresentationContexts
from pydicom.dataset import Dataset
import os
from datetime import datetime

class StorageHandler:
    """Handler for C-STORE requests"""
    
    def __init__(self, storage_dir='DicomStorage'):
        self.storage_dir = storage_dir
        self.received_count = 0
        
    def handle_store(self, event):
        """
        Handle C-STORE request events
        """
        # Get the dataset
        ds = event.dataset
        
        # Add file meta information
        ds.file_meta = event.file_meta
        
        # Get study, series, and SOP UIDs for directory structure
        study_uid = ds.StudyInstanceUID
        series_uid = ds.SeriesInstanceUID
        sop_uid = ds.SOPInstanceUID
        
        # Create directory structure: StudyUID/SeriesUID/
        path = os.path.join(self.storage_dir, study_uid, series_uid)
        os.makedirs(path, exist_ok=True)
        
        # Save the file
        filename = os.path.join(path, f'{sop_uid}.dcm')
        ds.save_as(filename, write_like_original=False)
        
        self.received_count += 1
        
        # Log the received file
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Received: "
              f"{ds.PatientName} - {ds.Modality} - {ds.StudyDescription}")
        print(f"  Saved to: {filename}")
        
        # Return success status
        return 0x0000

def start_storage_scp(port=11112, ae_title='STORAGE_SCP'):
    """
    Start a DICOM Storage SCP server
    """
    # Create handler instance
    handler = StorageHandler()
    
    # Create Application Entity
    ae = AE(ae_title=ae_title)
    
    # Accept all storage presentation contexts
    ae.supported_contexts = AllStoragePresentationContexts
    
    # Set up handlers
    handlers = [
        (evt.EVT_C_STORE, handler.handle_store),
        (evt.EVT_ACCEPTED, handle_accepted),
        (evt.EVT_RELEASED, handle_released),
    ]
    
    print(f"Starting Storage SCP on port {port}")
    print(f"AE Title: {ae_title}")
    print("Press Ctrl+C to stop\n")
    
    # Start listening for associations
    ae.start_server(('', port), evt_handlers=handlers, block=True)

def handle_accepted(event):
    """Handle association accepted event"""
    print(f"Association accepted from {event.assoc.remote.ae_title}")

def handle_released(event):
    """Handle association released event"""
    print(f"Association released")

# Run the server
if __name__ == "__main__":
    start_storage_scp(port=11112, ae_title='PYNETDICOM_SCP')

27.5 C-FIND: Querying for Studies

Query PACS for patient studies, series, or images:

from pynetdicom import AE
from pynetdicom.sop_class import (
    PatientRootQueryRetrieveInformationModelFind,
    StudyRootQueryRetrieveInformationModelFind
)
from pydicom.dataset import Dataset

def find_studies(patient_id=None, patient_name=None, study_date=None,
                modality=None, server_ip='localhost', server_port=104,
                server_ae_title='PACS', client_ae_title='PYNETDICOM'):
    """
    Query for studies using C-FIND
    """
    # Create query dataset
    ds = Dataset()
    ds.QueryRetrieveLevel = 'STUDY'
    
    # Patient level
    ds.PatientID = patient_id if patient_id else '*'
    ds.PatientName = patient_name if patient_name else ''
    ds.PatientBirthDate = ''
    ds.PatientSex = ''
    
    # Study level
    ds.StudyInstanceUID = ''
    ds.StudyDate = study_date if study_date else ''
    ds.StudyTime = ''
    ds.StudyDescription = ''
    ds.AccessionNumber = ''
    ds.StudyID = ''
    ds.ModalitiesInStudy = modality if modality else ''
    ds.NumberOfStudyRelatedSeries = ''
    ds.NumberOfStudyRelatedInstances = ''
    
    # Create Application Entity
    ae = AE(ae_title=client_ae_title)
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
    
    # Associate with peer
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    studies = []
    
    if assoc.is_established:
        print(f"Querying {server_ae_title} for studies...")
        
        # Send C-FIND request
        responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
        
        for (status, dataset) in responses:
            if status:
                if status.Status in (0xFF00, 0xFF01):  # Pending
                    if dataset:
                        studies.append(dataset)
                        print(f"Found: {dataset.PatientName} - "
                              f"{dataset.StudyDate} - "
                              f"{dataset.StudyDescription}")
                else:
                    print(f'C-FIND completed with status: 0x{status.Status:04X}')
        
        assoc.release()
        print(f"\nTotal studies found: {len(studies)}")
    else:
        print("Association rejected")
    
    return studies

def find_series(study_instance_uid, server_ip='localhost', server_port=104,
               server_ae_title='PACS', client_ae_title='PYNETDICOM'):
    """
    Query for series within a study
    """
    ds = Dataset()
    ds.QueryRetrieveLevel = 'SERIES'
    
    # Study level (must include)
    ds.StudyInstanceUID = study_instance_uid
    
    # Series level
    ds.SeriesInstanceUID = ''
    ds.SeriesNumber = ''
    ds.SeriesDescription = ''
    ds.Modality = ''
    ds.SeriesDate = ''
    ds.SeriesTime = ''
    ds.NumberOfSeriesRelatedInstances = ''
    
    ae = AE(ae_title=client_ae_title)
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
    
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    series_list = []
    
    if assoc.is_established:
        responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
        
        for (status, dataset) in responses:
            if status and status.Status in (0xFF00, 0xFF01) and dataset:
                series_list.append(dataset)
                print(f"Series {dataset.SeriesNumber}: "
                      f"{dataset.Modality} - {dataset.SeriesDescription} "
                      f"({dataset.NumberOfSeriesRelatedInstances} images)")
        
        assoc.release()
    
    return series_list

# Advanced query with date range
def find_studies_date_range(start_date, end_date, modality=None,
                           server_ip='localhost', server_port=104,
                           server_ae_title='PACS', client_ae_title='PYNETDICOM'):
    """
    Find studies within a date range
    Date format: YYYYMMDD
    """
    ds = Dataset()
    ds.QueryRetrieveLevel = 'STUDY'
    
    # Patient level
    ds.PatientName = ''
    ds.PatientID = ''
    
    # Study level with date range
    ds.StudyInstanceUID = ''
    ds.StudyDate = f'{start_date}-{end_date}'  # Range matching
    ds.StudyDescription = ''
    ds.ModalitiesInStudy = modality if modality else ''
    
    ae = AE(ae_title=client_ae_title)
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
    
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    studies = []
    if assoc.is_established:
        responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
        
        for (status, dataset) in responses:
            if status and status.Status in (0xFF00, 0xFF01) and dataset:
                studies.append(dataset)
        
        assoc.release()
    
    return studies

27.6 C-MOVE: Requesting Transfer to Another AE

C-MOVE instructs the server to send images to a destination:

from pynetdicom import AE
from pynetdicom.sop_class import (
    PatientRootQueryRetrieveInformationModelMove,
    StudyRootQueryRetrieveInformationModelMove
)
from pydicom.dataset import Dataset

def move_study(study_uid, destination_ae, server_ip='localhost', 
               server_port=104, server_ae_title='PACS', 
               client_ae_title='PYNETDICOM'):
    """
    Request C-MOVE to send study to destination AE
    """
    # Create query dataset
    ds = Dataset()
    ds.QueryRetrieveLevel = 'STUDY'
    ds.StudyInstanceUID = study_uid
    
    # Create Application Entity
    ae = AE(ae_title=client_ae_title)
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
    
    # Associate with peer
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    if assoc.is_established:
        print(f"Requesting move of study {study_uid} to {destination_ae}")
        
        # Send C-MOVE request
        responses = assoc.send_c_move(
            ds, 
            destination_ae,
            PatientRootQueryRetrieveInformationModelMove
        )
        
        for (status, dataset) in responses:
            if status:
                # Check if pending (0xFF00)
                if status.Status == 0xFF00:
                    print(f"Progress: {status.NumberOfCompletedSuboperations}/"
                          f"{status.NumberOfCompletedSuboperations + "
                          f"status.NumberOfRemainingSuboperations}")
                elif status.Status == 0x0000:
                    print(f"C-MOVE completed successfully")
                    print(f"  Completed: {status.NumberOfCompletedSuboperations}")
                    print(f"  Failed: {status.NumberOfFailedSuboperations}")
                    print(f"  Warnings: {status.NumberOfWarningSuboperations}")
                else:
                    print(f"C-MOVE failed with status: 0x{status.Status:04X}")
        
        assoc.release()
    else:
        print("Association rejected")

def move_series(study_uid, series_uid, destination_ae,
                server_ip='localhost', server_port=104,
                server_ae_title='PACS', client_ae_title='PYNETDICOM'):
    """
    Move specific series
    """
    ds = Dataset()
    ds.QueryRetrieveLevel = 'SERIES'
    ds.StudyInstanceUID = study_uid
    ds.SeriesInstanceUID = series_uid
    
    ae = AE(ae_title=client_ae_title)
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
    
    assoc = ae.associate(server_ip, server_port, ae_title=server_ae_title)
    
    if assoc.is_established:
        responses = assoc.send_c_move(
            ds,
            destination_ae,
            PatientRootQueryRetrieveInformationModelMove
        )
        
        for (status, dataset) in responses:
            if status:
                if status.Status == 0xFF00:
                    remaining = status.NumberOfRemainingSuboperations
                    completed = status.NumberOfCompletedSuboperations
                    print(f"Moving... {completed}/{remaining + completed}")
                elif status.Status == 0x0000:
                    print("Move completed successfully")
        
        assoc.release()

# Move with progress callback
class MoveHandler:
    def __init__(self):
        self.total = 0
        self.completed = 0
    
    def handle_move_response(self, status, dataset):
        """Handle C-MOVE responses with progress tracking"""
        if status.Status == 0xFF00:  # Pending
            self.completed = status.NumberOfCompletedSuboperations
            self.total = (status.NumberOfCompletedSuboperations + 
                         status.NumberOfRemainingSuboperations)
            
            if self.total > 0:
                progress = (self.completed / self.total) * 100
                print(f"Progress: {progress:.1f}% ({self.completed}/{self.total})")
        
        elif status.Status == 0x0000:  # Success
            print(f"✓ Move completed: {self.completed} images transferred")
        else:
            print(f"✗ Move failed: 0x{status.Status:04X}")

27.7 C-GET: Direct Retrieval

C-GET retrieves images directly over the same association:

from pynetdicom import AE, evt
from pynetdicom.sop_class import (
    PatientRootQueryRetrieveInformationModelGet,
    CTImageStorage, MRImageStorage
)
from pydicom.dataset import Dataset
import os

class GetHandler:
    """Handler for incoming C-STORE sub-operations during C-GET"""
    
    def __init__(self, storage_dir='Downloads'):
        self.storage_dir = storage_dir
        self.received_files = []
        
    def handle_store(self, event):
        """Handle incoming C-STORE from C-GET"""
        ds = event.dataset
        ds.file_meta = event.file_meta
        
        # Create storage directory
        os.makedirs(self.storage_dir, exist_ok=True)
        
        # Save file
        filename = os.path.join(self.storage_dir, f'{ds.SOPInstanceUID}.dcm')
        ds.save_as(filename, write_like_original=False)
        
        self.received_files.append(filename)
        print(f"  Received: {os.path.basename(filename)}")
        
        return 0x0000  # Success

def get_study(study_uid, server_ip='localhost', server_port=104,
              server_ae_title='PACS', client_ae_title='PYNETDICOM'):
    """
    Retrieve study using C-GET
    """
    # Create handler
    handler = GetHandler()
    
    # Create query dataset
    ds = Dataset()
    ds.QueryRetrieveLevel = 'STUDY'
    ds.StudyInstanceUID = study_uid
    
    # Create Application Entity
    ae = AE(ae_title=client_ae_title)
    
    # Add presentation contexts for C-GET
    ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
    
    # Add storage contexts for incoming C-STORE
    storage_classes = [
        CTImageStorage,
        MRImageStorage,
        # Add more storage classes as needed
    ]
    for storage_class in storage_classes:
        ae.add_requested_context(storage_class)
    
    # Set up handlers
    handlers = [(evt.EVT_C_STORE, handler.handle_store)]
    
    # Associate with peer
    assoc = ae.associate(
        server_ip, 
        server_port, 
        ae_title=server_ae_title,
        evt_handlers=handlers
    )
    
    if assoc.is_established:
        print(f"Retrieving study {study_uid} using C-GET...")
        
        # Send C-GET request
        responses = assoc.send_c_get(
            ds,
            PatientRootQueryRetrieveInformationModelGet
        )
        
        for (status, dataset) in responses:
            if status:
                if status.Status == 0xFF00:  # Pending
                    completed = status.NumberOfCompletedSuboperations
                    remaining = status.NumberOfRemainingSuboperations
                    total = completed + remaining
                    print(f"Progress: {completed}/{total}")
                    
                elif status.Status == 0x0000:  # Success
                    print(f"C-GET completed successfully")
                    print(f"Retrieved {len(handler.received_files)} files")
                else:
                    print(f"C-GET failed: 0x{status.Status:04X}")
        
        assoc.release()
    else:
        print("Association rejected")
    
    return handler.received_files

27.8 Complete Workflow Example

Here’s a practical workflow combining multiple operations:

from pynetdicom import AE, debug_logger, evt
from pynetdicom.sop_class import *
from pydicom.dataset import Dataset
import os
from datetime import datetime, timedelta

class DicomWorkflow:
    """Complete DICOM workflow handler"""
    
    def __init__(self, server_ip, server_port, server_ae_title, 
                 client_ae_title='PYNETDICOM'):
        self.server_ip = server_ip
        self.server_port = server_port
        self.server_ae_title = server_ae_title
        self.client_ae_title = client_ae_title
        
    def test_connection(self):
        """Test connectivity with C-ECHO"""
        ae = AE(ae_title=self.client_ae_title)
        ae.add_requested_context(Verification)
        
        assoc = ae.associate(self.server_ip, self.server_port, 
                           ae_title=self.server_ae_title)
        
        if assoc.is_established:
            status = assoc.send_c_echo()
            assoc.release()
            return status and status.Status == 0x0000
        return False
    
    def find_today_studies(self, modality=None):
        """Find all studies from today"""
        today = datetime.now().strftime('%Y%m%d')
        
        ds = Dataset()
        ds.QueryRetrieveLevel = 'STUDY'
        ds.PatientName = ''
        ds.PatientID = ''
        ds.StudyInstanceUID = ''
        ds.StudyDate = today
        ds.StudyDescription = ''
        ds.ModalitiesInStudy = modality if modality else ''
        ds.NumberOfStudyRelatedInstances = ''
        
        ae = AE(ae_title=self.client_ae_title)
        ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
        
        studies = []
        assoc = ae.associate(self.server_ip, self.server_port,
                           ae_title=self.server_ae_title)
        
        if assoc.is_established:
            responses = assoc.send_c_find(
                ds, 
                PatientRootQueryRetrieveInformationModelFind
            )
            
            for (status, dataset) in responses:
                if status and status.Status in (0xFF00, 0xFF01) and dataset:
                    studies.append({
                        'PatientName': str(dataset.PatientName),
                        'PatientID': dataset.PatientID,
                        'StudyUID': dataset.StudyInstanceUID,
                        'StudyDescription': dataset.StudyDescription,
                        'Modality': dataset.ModalitiesInStudy,
                        'NumImages': dataset.NumberOfStudyRelatedInstances
                    })
            
            assoc.release()
        
        return studies
    
    def retrieve_and_process_study(self, study_uid, process_func=None):
        """
        Retrieve a study and optionally process each image
        """
        # Set up temporary storage SCP
        storage_port = 11113
        storage_ae = 'TEMP_SCP'
        received_files = []
        
        def handle_store(event):
            """Handle incoming C-STORE"""
            ds = event.dataset
            ds.file_meta = event.file_meta
            
            # Save file
            os.makedirs('temp_storage', exist_ok=True)
            filepath = f'temp_storage/{ds.SOPInstanceUID}.dcm'
            ds.save_as(filepath, write_like_original=False)
            received_files.append(filepath)
            
            # Process if function provided
            if process_func:
                process_func(ds)
            
            return 0x0000
        
        # Start storage SCP in thread
        import threading
        
        def run_scp():
            ae = AE(ae_title=storage_ae)
            ae.supported_contexts = AllStoragePresentationContexts
            handlers = [(evt.EVT_C_STORE, handle_store)]
            ae.start_server(('', storage_port), evt_handlers=handlers, 
                          block=False)
        
        scp_thread = threading.Thread(target=run_scp)
        scp_thread.daemon = True
        scp_thread.start()
        
        # Wait for SCP to start
        import time
        time.sleep(2)
        
        # Request C-MOVE
        self.move_study(study_uid, storage_ae)
        
        return received_files
    
    def create_worklist(self, patient_id):
        """Create a worklist for a specific patient"""
        print(f"\n{'='*60}")
        print(f"WORKLIST FOR PATIENT: {patient_id}")
        print(f"{'='*60}\n")
        
        # Find studies
        studies = self.find_studies_for_patient(patient_id)
        
        if not studies:
            print("No studies found")
            return
        
        # Display worklist
        for i, study in enumerate(studies, 1):
            print(f"{i}. Study Date: {study['StudyDate']}")
            print(f"   Description: {study['StudyDescription']}")
            print(f"   Modality: {study['Modality']}")
            print(f"   Images: {study['NumImages']}")
            print(f"   Study UID: {study['StudyUID'][:30]}...")
            print()
        
        return studies
    
    def find_studies_for_patient(self, patient_id):
        """Find all studies for a patient"""
        ds = Dataset()
        ds.QueryRetrieveLevel = 'STUDY'
        ds.PatientID = patient_id
        ds.PatientName = ''
        ds.StudyInstanceUID = ''
        ds.StudyDate = ''
        ds.StudyDescription = ''
        ds.ModalitiesInStudy = ''
        ds.NumberOfStudyRelatedInstances = ''
        
        ae = AE(ae_title=self.client_ae_title)
        ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
        
        studies = []
        assoc = ae.associate(self.server_ip, self.server_port,
                           ae_title=self.server_ae_title)
        
        if assoc.is_established:
            responses = assoc.send_c_find(
                ds,
                PatientRootQueryRetrieveInformationModelFind
            )
            
            for (status, dataset) in responses:
                if status and status.Status in (0xFF00, 0xFF01) and dataset:
                    studies.append({
                        'PatientName': str(dataset.PatientName),
                        'StudyUID': dataset.StudyInstanceUID,
                        'StudyDate': dataset.StudyDate,
                        'StudyDescription': dataset.StudyDescription,
                        'Modality': dataset.ModalitiesInStudy,
                        'NumImages': dataset.NumberOfStudyRelatedInstances
                    })
            
            assoc.release()
        
        return studies

# Example usage
if __name__ == "__main__":
    # Initialize workflow
    workflow = DicomWorkflow(
        server_ip='192.168.1.100',
        server_port=104,
        server_ae_title='PACS_SERVER',
        client_ae_title='PYTHON_CLIENT'
    )
    
    # Test connection
    if workflow.test_connection():
        print("✓ Connection successful")
        
        # Find today's CT studies
        studies = workflow.find_today_studies(modality='CT')
        print(f"Found {len(studies)} CT studies today")
        
        # Create worklist for specific patient
        workflow.create_worklist('12345678')
    else:
        print("✗ Connection failed")

27.9 Advanced Features

27.9.1 Association Configuration

from pynetdicom import AE, build_context
from pynetdicom.presentation import PresentationContext

def create_custom_ae():
    """Create AE with custom configuration"""
    ae = AE(ae_title='CUSTOM_AE')
    
    # Configure network settings
    ae.acse_timeout = 30  # Association timeout
    ae.dimse_timeout = 30  # DIMSE message timeout
    ae.network_timeout = 30  # Network timeout
    ae.maximum_pdu_size = 16384  # Max PDU size
    
    # Add custom presentation contexts with transfer syntaxes
    from pydicom.uid import (
        ImplicitVRLittleEndian,
        ExplicitVRLittleEndian,
        JPEGBaseline,
        JPEG2000
    )
    
    transfer_syntaxes = [
        ImplicitVRLittleEndian,
        ExplicitVRLittleEndian,
        JPEGBaseline,
        JPEG2000
    ]
    
    # Add CT Image Storage with multiple transfer syntaxes
    context = build_context(CTImageStorage, transfer_syntaxes)
    ae.add_requested_context(context)
    
    return ae

27.9.2 Error Handling and Logging

import logging
from pynetdicom import debug_logger

# Enable debug logging
debug_logger()

# Or configure custom logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('pynetdicom')

def robust_operation(func, *args, max_retries=3, **kwargs):
    """Wrapper for robust DICOM operations with retry"""
    for attempt in range(max_retries):
        try:
            result = func(*args, **kwargs)
            return result
        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

These Python examples with pynetdicom should give you a complete toolkit for implementing DICOM operations in your radiology AI unit at Ramathibodi Hospital. The library is particularly well-suited for research and custom applications where you need fine-grained control over DICOM communications.