$ conda install -c conda-forge python-confluent-kafka
from confluent_kafka import Producer import sys if __name__ == '__main__': if len(sys.argv) != 2: sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0]) sys.exit(1) topic = sys.argv[1] conf = { 'bootstrap.servers': '<EventHub Namespace Name>.servicebus.windows.net:9093', # 여기에 생성한 EventHub의 NameSpace으로 변환하시면 됩니다. 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': '$ConnectionString', 'sasl.password': '<SAS Connection Endpoint URL>', # 여기에 바로 위에서 발급받은 SAS EndPoint를 입력합니다. 'client.id': 'nasa1515-producer' } # Create Producer instance p = Producer(**conf) # fail check def def delivery_callback(err, msg): if err: sys.stderr.write('%% Message failed delivery: %s\n' % err) else: sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset())) # Write 1-100 to topic for i in range(0, 1000): # 저는 Range로 0~1000 까지의 숫자로 문자열을 생성해서 게시 했습니다. try: p.produce(topic, 'Kafka_data_nasa1515-' + str(i), callback=delivery_callback) except BufferError as e: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) p.poll(0) # Wait until all messages have been delivered sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) p.flush()
$ python3 producer.py <Topic Name> ... ... python3 /home/nasa1515/docker/producer/Azure/producer.py nasatopic -> 생성 할 Topic Name
(kafka) nasa1515@L-wslee:~$ python3 /home/nasa1515/docker/producer/Azure/producer.py nasatopic % Waiting for 1000 deliveries % Message delivered to nasatopic [0] @ 0 % Message delivered to nasatopic [0] @ 1 % Message delivered to nasatopic [0] @ 2 % Message delivered to nasatopic [0] @ 3 % Message delivered to nasatopic [0] @ 4 % Message delivered to nasatopic [0] @ 5 % Message delivered to nasatopic [0] @ 6 % Message delivered to nasatopic [0] @ 7 % Message delivered to nasatopic [0] @ 10 % Message delivered to nasatopic [0] @ 11 % Message delivered to nasatopic [0] @ 12 % Message delivered to nasatopic [0] @ 13 % Message delivered to nasatopic [0] @ 14 % Message delivered to nasatopic [0] @ 15 % Message delivered to nasatopic [0] @ 16 % Message delivered to nasatopic [0] @ 17 % Message delivered to nasatopic [0] @ 20 ....
from confluent_kafka import Consumer, KafkaException, KafkaError import sys import getopt import json import logging import pandas as pandas from pprint import pformat def stats_cb(stats_json_str): stats_json = json.loads(stats_json_str) print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) def print_usage_and_exit(program_name): sys.stderr.write('Usage: %s [options..] <consumer-group> <topic1> <topic2> ..\n' % program_name) options = ''' Options: -T <intvl> Enable client statistics at specified interval (ms) ''' sys.stderr.write(options) sys.exit(1) if __name__ == '__main__': optlist, argv = getopt.getopt(sys.argv[1:], 'T:') if len(argv) < 2: print_usage_and_exit(sys.argv[0]) group = argv[0] topics = argv[1:] conf = { 'bootstrap.servers': '<Your NameSpace Name>.servicebus.windows.net:9093', # NameSpace Name 입력해줍니다. 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': '$ConnectionString', 'sasl.password': '<SAS Token Endpoint URL>', # Endpoint Url 입력 'group.id': group, 'client.id': '<Cumstom>', 'request.timeout.ms': 60000, 'session.timeout.ms': 60000, 'default.topic.config': {'auto.offset.reset': 'smallest'} } # Check to see if -T option exists for opt in optlist: if opt[0] != '-T': continue try: intval = int(opt[1]) except ValueError: sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) sys.exit(1) if intval <= 0: sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) sys.exit(1) conf['stats_cb'] = stats_cb conf['statistics.interval.ms'] = int(opt[1]) # Create logger for consumer (logs will be emitted when poll() is called) logger = logging.getLogger('consumer') logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s')) logger.addHandler(handler) # Create Consumer instance # Hint: try debug='fetch' to generate some log messages c = Consumer(conf, logger=logger) def print_assignment(consumer, partitions): print('Assignment:', partitions) # Subscribe to topics c.subscribe(topics, on_assign=print_assignment) # Read messages from Kafka, print to stdout try: while True: msg = c.poll(timeout=100.0) if msg is None: continue if msg.error(): # Error or event if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) else: # Error raise KafkaException(msg.error()) else: # Proper message print(msg.value()) except KeyboardInterrupt: sys.stderr.write('%% Aborted by user\n') finally: # Close down consumer to commit final offsets. c.close()
$ python3 producer.py <Comsumer_GROUP_ID> <TOPIC_NAME_1> <TOPIC_NAME_2> ... ... ... python3 /home/nasa1515/docker/producer/Azure/Consum.py $Default nasatopic nasatopic
2022-05-12 10:11:59,390 WARNING CONFWARN [nasa1515-consumer#consumer-1] [thrd:app]: Configuration property request.timeout.ms is a producer property and will be ignored by this consumer instance Assignment: [TopicPartition{topic=nasatopic,partition=0,offset=-1001,error=None}] b'Kafka_data_nasa1515-0' b'Kafka_data_nasa1515-1' b'Kafka_data_nasa1515-2' b'Kafka_data_nasa1515-3' b'Kafka_data_nasa1515-4' b'Kafka_data_nasa1515-5' b'Kafka_data_nasa1515-6' b'Kafka_data_nasa1515-7' b'Kafka_data_nasa1515-8' b'Kafka_data_nasa1515-9' b'Kafka_data_nasa1515-10' b'Kafka_data_nasa1515-11' b'Kafka_data_nasa1515-12' ... ...
import json from google.auth import jwt from concurrent import futures from google.cloud import pubsub_v1 service_account_info = json.load(open("/home/nasa1515/docker/producer/GCP/data-cloocus-ffd800735dd1.json")) credentials_pub = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher" credentials = jwt.Credentials.from_service_account_info( service_account_info, audience=credentials_pub ) publisher = pubsub_v1.PublisherClient(credentials=credentials) project_id = "data-cloocus" topic_id = "pubsub_nasa1515" topic_path = publisher.topic_path(project_id, topic_id) for n in range(1, 100): data_str = f"nasa1515_Pubsub_Massage : {n}" data = data_str.encode("utf-8") future = publisher.publish(topic_path, data) print(future.result()) print(f"Published messages to {topic_path}.")
import os import json from google.auth import jwt from google.cloud import pubsub_v1 service_account_info = json.load(open("/home/nasa1515/docker/producer/GCP/data-cloocus-ffd800735dd1.json")) credentials_sub = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber" credentials = jwt.Credentials.from_service_account_info( service_account_info, audience=credentials_sub ) subscriber = pubsub_v1.SubscriberClient(credentials=credentials) project_id = "data-cloocus" topic_id = "pubsub_nasa1515" subscription = "pubsub_nasa1515-sub" topic_name = f'projects/{project_id}/topics/{topic_id}' subscription_name = f'projects/{project_id}/subscriptions/{subscription}' def callback(message): print(message.data) message.ack() with pubsub_v1.SubscriberClient() as subscriber: try: response = subscriber.get_subscription(subscription=subscription_name) print(response) except: subscriber.create_subscription( name=subscription_name, topic=topic_name) future = subscriber.subscribe(subscription_name, callback) else: future = subscriber.subscribe(subscription_name, callback) try: future.result() except KeyboardInterrupt: future.cancel()