मैंने एक स्क्रिप्ट लिखी है जिसमें कई धागे हैं (threading.Thread के साथ बनाया गया) Queue से queue.get_nowait() का उपयोग करके URL प्राप्त कर रहा है, और फिर HTML को संसाधित कर रहा है। मैं बहु-थ्रेडेड प्रोग्रामिंग में नया हूं, और मुझे queue.task_done() फ़ंक्शन के उद्देश्य को समझने में परेशानी हो रही है।

जब Queue खाली होता है, तो यह स्वतः ही queue.Empty अपवाद देता है। इसलिए मुझे समझ में नहीं आता कि प्रत्येक थ्रेड को task_done() फ़ंक्शन को कॉल करने की आवश्यकता है। हम जानते हैं कि जब कतार खाली हो जाती है, तो हमें यह सूचित करने की आवश्यकता क्यों होती है कि कार्यकर्ता थ्रेड्स ने अपना काम पूरा कर लिया है (जिसका कतार से कोई लेना-देना नहीं है, जब वे इससे URL प्राप्त कर लेते हैं) ?

क्या कोई मुझे एक कोड उदाहरण प्रदान कर सकता है (आदर्श रूप से urllib, फ़ाइल I/O, या फाइबोनैचि संख्याओं के अलावा कुछ और "हैलो" प्रिंट करना) जो मुझे दिखाता है कि व्यावहारिक अनुप्रयोगों में इस फ़ंक्शन का उपयोग कैसे किया जाएगा?

14
J. Taylor 3 अप्रैल 2018, 21:43

3 जवाब

सबसे बढ़िया उत्तर

Queue.task_done कामगारों के लाभ के लिए नहीं है। यह Queue.join का समर्थन करने के लिए है।


अगर मैं आपको कार्य असाइनमेंट का एक बॉक्स देता हूं, तो क्या मुझे इस बात की परवाह है कि आपने सब कुछ बॉक्स से बाहर कर दिया है?

नहीं। मुझे इस बात की परवाह है कि काम कब पूरा हो जाए। एक खाली डिब्बे को देखने से मुझे यह नहीं पता चलता। आप और 5 अन्य लोग अब भी उस सामान पर काम कर रहे होंगे जिसे आपने बॉक्स से बाहर निकाला था।

Queue.task_done कार्यकर्ताओं को यह कहने देता है कि कार्य कब हो गयाQueue.join के साथ किए जाने वाले सभी कार्यों की प्रतीक्षा करने वाला कोई व्यक्ति पर्याप्त task_done कॉल किए जाने तक प्रतीक्षा करेगा, न कि जब कतार खाली हो।

24
user2357112 supports Monica 13 पद 2018, 21:41

.task_done() का उपयोग .join() को चिह्नित करने के लिए किया जाता है कि प्रसंस्करण हो गया है।

यदि आप .join() का उपयोग करते हैं और प्रत्येक संसाधित आइटम के लिए .task_done() को कॉल नहीं करते हैं, तो आपकी स्क्रिप्ट हमेशा के लिए हैंग हो जाएगी।


एक संक्षिप्त उदाहरण की तरह कुछ नहीं है;

import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = False


def items_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item is None:
                continue

            try:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            pass
        except:
            logging.exception('error while processing item')


def process_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))


if __name__ == '__main__':
    running = True

    # Create 10 items_queue_worker threads
    worker_threads = 10
    for _ in range(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with data
    for i in range(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False
9
Jossef Harush 15 अगस्त 2019, 09:04

क्या कोई मुझे एक कोड उदाहरण प्रदान कर सकता है (आदर्श रूप से urllib, फ़ाइल I/O, या फाइबोनैचि संख्याओं के अलावा कुछ और "हैलो" प्रिंट करना) जो मुझे दिखाता है कि यह फ़ंक्शन व्यावहारिक अनुप्रयोगों में कैसे उपयोग किया जाएगा?

@ user2357112 का answer task_done के उद्देश्य की अच्छी तरह से व्याख्या करता है, लेकिन अनुरोधित उदाहरण का अभाव है। यहां एक फ़ंक्शन है जो फ़ाइलों की एक मनमानी संख्या के चेकसम की गणना करता है और प्रत्येक फ़ाइल नाम को संबंधित चेकसम पर मैप करने वाला एक निर्देश देता है। फ़ंक्शन के लिए आंतरिक, कार्य को कई थ्रेड्स में विभाजित किया गया है।

फ़ंक्शन Queue.join का उपयोग तब तक प्रतीक्षा करने के लिए करता है जब तक कि श्रमिक अपने असाइन किए गए कार्यों को पूरा नहीं कर लेते, इसलिए कॉलर को डिक्शनरी वापस करना सुरक्षित है। यह सभी फ़ाइलों के संसाधित होने की प्रतीक्षा करने का एक सुविधाजनक तरीका है, न कि उन्हें केवल डीक्यू किया जा रहा है।

import threading, queue, hashlib

def _work(q, checksums):
    while True:
        filename = q.get()
        if filename is None:
            q.put(None)
            break
        try:
            sha = hashlib.sha256()
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

def calc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i in range(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exit
    return checksums

GIL पर एक नोट: चूंकि hashlib में कोड चेकसम की गणना करते समय आंतरिक रूप से GIL को रिलीज़ करता है, कई थ्रेड्स का उपयोग करने से सिंगल-थ्रेडेड वेरिएंट की तुलना में एक औसत दर्जे का (पायथन संस्करण के आधार पर 1.75x-2x) स्पीडअप प्राप्त होता है।

3
user4815162342 15 पद 2018, 12:42