giriiş
Nesnelerin İnterneti (IoT) cihazlarından bir veri gölüne veri gönderirken, daha çok veri işleme ve görselleştirme için aygıt veri yükünü buluttaki ek meta verilerle zenginleştirmeniz gerekebilir. Sınırı olan bant genişliğine haiz ortamlarda aygıt yükünün en aza indirilmesi yada buluttaki iş girişleriyle değiştirilmesi benzer biçimde bu verilerin aygıt yükünde bulunmamasının birden fazla sebebi olabilir. Mesela yapınak katındaki bir makine gün içinde değişik operatörlere atanabilir. Bu değişken iş verileri bir veritabanında saklanacaktır. Veri gölünüzde bu bilgilerin veri yüküyle beraber saklanmasına ihtiyacınız olabilir.
Bu blog yazısında zenginleştirilmiş IoT verilerini neredeyse gerçek zamanlı olarak bir veri gölüne iyi mi aktaracağınızı öğreneceksiniz.
Önkoşullar
- Bir AWS hesabı
- AWS Komut Satırı Arayüzü (AWS CLI). Yapılandırma için AWS CLI süratli kurulumuna bakın.
Durum tanımını kullanın
Lojistik şirketinizde sensör özellikli IoT cihazlarıyla donatılmış konteynerleriniz bulunduğunu varsayalım. Konteyner bir gemiye yüklendiğinde konteyner ID’si vapur ID’si ile ilişkilendirilir. IoT aygıt yükünü vapur kimliğiyle beraber veri gölünüzde saklamanız gerekir.
Bu şekilde bir kullanım durumunda sensör yükü, konteynere takılı IoT aletinden gelir. Sadece ilgili vapur kimliği yalnızca meta veri deposunda saklanır. Bundan dolayı yükün veri gölüne konulmadan ilkin vapur kimliği ile zenginleştirilmesi gerekir.
Çözüm mimarisi
Mimari diyagramda,
- IoT cihazları, AWS IoT Core bildiri komisyoncusuna belirli bir MQTT mevzu aletine/verilerine/verilerine veri akışı sağlar.DEVICE_ID. AWS IoT Core bildiri aracısı, desteklenen protokolleri kullanarak cihazların bildiri yayınlamasına ve mesajlara abone olmasına olanak tanır.
- AWS IoT kuralı, mevzusunda bir veri yükü olduğunda tetiklenir. Bu kullanım durumunda bir Amazon Kinesis Data Firehose eylemiyle yapılandırılır. Belirli bir MQTT mevzusunda bir bildiri olduğunda AWS hizmetlerini arayarak yada direkt Temel Alım hususi durumunu kullanarak AWS hizmetleriyle etkileşimde bulunmak için AWS IoT kurallarını kullanabilirsiniz.
- Amazon Kinesis Data Firehose, aygıt yüklerini boyuta yada zamana nazaran (hangisi ilkin gerçekleşirse) veri deposuna teslim etmeden ilkin arabelleğe alır. Kinesis Data Firehose, depolama yada işleme için hedeflere gerçek zamanlı veri akışı sağlar.
- Arabellek boyuta yada süre eşiğine ulaştığında Kinesis Data Firehose, Amazon DynamoDB’den alınan meta verilerle aygıt yüklerini toplu olarak zenginleştirmek için bir AWS Lambda işlevini çağırır. AWS Lambda, her tür uygulama için kodunuzu çalıştıran sunucusuz bir informasyon işlem hizmetidir. . Amazon DynamoDB, süratli performans elde eden, tam olarak yönetilen bir NoSQL veritabanıdır.
- Zenginleştirilmiş yükler, hedefe teslim edilmek suretiyle Kinesis Data Firehose’a geri gönderilir.
- Zenginleştirilmiş veriler, hedef olarak bir Amazon Simple Storage Service (Amazon S3) klasörüne yerleştirilir. Amazon S3, çeşitli kullanım durumları için her oranda veriyi depolayan bir nesne depolama hizmetidir.
AWS CloudFormation şablonu
AWS Cloudformation şablonunu kod deposundan indirin.
AWS CloudFormation şablonu, bu örnek kullanım senaristliğini çalıştırmak için lüzumlu tüm kaynakları dağıtır. AWS IoT kurallarına, Kinesis Data Firehose’a ve AWS Lambda işlevi kaynaklarına daha yakından bakalım.
AWS IoT kuralları deposu
IoTToFirehoseRule:
Type: AWS::IoT::TopicRule
Properties:
TopicRulePayload:
Actions:
-
Firehose:
RoleArn: !GetAtt IoTFirehosePutRecordRole.Arn
DeliveryStreamName: !Ref FirehoseDeliveryStream
Separator: "n"
AwsIotSqlVersion: ‘2016-03-23’
Description: This rule logs IoT payloads to S3 Bucket by aggregating in Kinesis Firehose.
RuleDisabled: false
Sql: !Ref IotKinesisRuleSQL
AWS IoT kuralı, yükten çıkarılacak kuralı ve verileri tetiklemek için IoT konusunu tanımlayan bir SQL parametresi alır.
- Örnekte, SQL parametresi varsayılan olarak SELECT *, mevzu(3) olarak ContainerId FROM ‘device/data/+’ olarak ayarlanmıştır. SELECT *, tüm payload’ın olduğu benzer biçimde alındığı ve ContainerId’nin MQTT mevzusundaki ikinci öğeden üretilip payload’a dahil edilmiş olduğu anlamına gelir.
- FROM ‘device/data/+’, AWS IoT kuralını tetikleyecek IoT konusunu açıklar. +, MQTT mevzuları için bir joker karakterdir ve IoT cihazları, bu kuralı tetiklemek için aygıt/veri/DEVICE_ID mevzusuna veri yüklerini yayınlayacaktır.
AWS IoT kuralı hem de eylemleri de tanımlar. Örnekte, hedef Kinesis Data Firehose dağıtım akışını tanımlayan bir Kinesis Data Firehose eylemini ve kayıtları bu dağıtım akışına koymak için ihtiyaç duyulan AWS Identity and Access Management (IAM) rolünü görebilirsiniz. Her kaydı ayırmak için bir ayırıcı seçilebilir; verilen örnekte bu, yeni bir satır karakteridir.
Kinesis Data Firehose dağıtım akışı deposu
FirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt IoTLogBucket.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
Prefix: device-data/
RoleARN: !GetAtt FirehosePutS3Role.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !Sub '${FirehoseTransformLambda.Arn}:$LATEST'
- ParameterName: RoleArn
ParameterValue: !GetAtt FirehoseLambdaInvokeRole.Arn
Kinesis Data Firehose teslim akışı, akışın yerleştirileceği bir hedef tanımlamalıdır. Değişik hedef türlerini destek sunar. Mevcut hedef türlerini ve bunların kullanımını bu belgede bulabilirsiniz. Bu örnekte hedef olarak Amazon S3’ü kullanacaksınız.
Örnek Teslimat Akışı deposu aşağıdaki özellikleri tanımlar:
- BucketARN: toplanan verileri depolayacak hedef paket. Hedef paket CloudFormation yığını tarafınca oluşturulur.
- BufferingHints: veri arabelleğe alma için boyut ve süre eşiği. Bu örnekte neticeleri daha süratli görebilmek için sırasıyla 1MB ve 60 saniyeye ayarlanmıştır. İş gereksinimlerine nazaran ayarlanabilir. Bu eşiklerin düşük tutulması Lambda fonksiyonunun daha sık çağrılmasına niçin olacaktır. Eşikler yüksekse veriler veri deposuna daha azca çoğunlukla alınır, bu yüzden veri deposunda son olarak verilerin görülmesi süre alır.
- Önek: oluşturulan nesneler bu önekin altına yerleştirilecektir. Kinesis Data Firehose, verileri varsayılan olarak süre damgasına nazaran bölümlere ayırır. Bu örnekte nesneler, aygıt verileri/YYYY/AA/gg/HH klasörü altına yerleştirilecektir. Kinesis Data Firehose, dinamik bölümleme benzer biçimde veri bölümlemeye yönelik gelişmiş özelliklere haizdir. Veri gölü sorgulanırken verilerin bölümlenmesi önemlidir. Mesela Amazon Athena kullanarak verileri aygıt bazında sorgulamanız gerekiyorsa yalnız ilgili aygıt ID’sinin bölümünü taramanız, tarama süresini ve maliyetini mühim seviyede azaltacaktır. Bölümlemeye ilişkin ayrıntıları bu belgede bulabilirsiniz.
- RoleARN: Bu, PutObject’e Kinesis Data Firehose’a toplu verileri Amazon S3 klasörüne koyabilmesi için müsaade eden IAM rolüdür.
- ProcessingConfiguration: Kullanım örneğinde açıklanmış olduğu benzer biçimde, Lambda dönüşümü işlevi, IoT verilerini meta verilerle zenginleştirecektir. İşleme Yapılandırması örnekte Lambda işlevi olan işlemciyi tanımlar. Kinesis Data Firehose, her veri kümesi için verilerin dönüştürülmesi amacıyla bu Lambda işlevini çağıracaktır. Bu belgede veri işleme hakkında daha çok informasyon edinebilirsiniz.
Dönüşüm Lambda Fonksiyonu
Aşağıdaki Python kodunda görebileceğiniz benzer biçimde, Kinesis Data Firehose, her kaydın IoT cihazlarından gelen bir yük olduğu bir kayıt kümesini döndürür. İlk olarak base64 kodlu yük verilerinin kodu çözülür. Hemen sonra ilgili vapur kimliği, konteyner kimliğine dayalı olarak DynamoDB tablosundan gelir. Yük, vapur kimliğiyle zenginleştirilir ve base64’e geri kodlanır. Son olarak kayıt sıralaması Kinesis Data Firehose’a geri gönderilir.
Kinesis Data Firehose kayıtları aldıktan sonrasında bu tarz şeyleri toplu bir dosya olarak Amazon S3 klasörüne koyar.
import os
import boto3
import json
import base64
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['METADATA_TABLE'])
records = []
def function_handler(event, context):
for record in event["records"]:
# Get data field of the record in json format. It is a base64 encoded string.
json_data = json.loads(base64.b64decode(record["data"]))
container_id = json_data["containerId"]
# Get corresponding shipId from the DynamoDB table
res = table.get_item(Key={'containerId': container_id})
ddb_item = res["Item"]
ship_id = ddb_item["shipId"]
# Append shipId to the actual record data
enriched_data = json_data
enriched_data["shipId"] = ship_id
# Encode the enriched record to base64
json_string = json.dumps(enriched_data).encode("ascii")
b64_encoded_data = base64.b64encode(json_string).decode("ascii")
# Create a record with enriched data and return back to Firehose
rec = {'recordId': record["recordId"], 'result': 'Ok', 'data': b64_encoded_data}
records.append(rec)
return {'records': records}
Dağıtım
Yığını dağıtmak için bir terminalde aşağıdaki komutu çalıştırın.
aws cloudformation deploy --stack-name IoTKinesisDataPath --template-file IoTKinesisDataPath.yml --parameter-overrides IotKinesisRuleSQL="SELECT *, topic(3) as containerId FROM 'device/data/+'" --capabilities CAPABILITY_NAMED_IAM
Dağıtım tamamlandıktan sonrasında dağıtımın çıktısını görmek için bir terminalde aşağıdaki komutu çalıştırın.
aws cloudformation describe-stacks --stack-name IoTKinesisDataPath
IoTLogS3BucketName, MetadataTableName çıkış parametrelerini not edin.
Kontrol yapmak
Dağıtım tamamlandıktan sonrasında yapmanız ihtiyaç duyulan ilk şey, veri zenginleştirme için bir meta veri öğesi oluşturmaktır. DynamoDB tablosunda bir unsur oluşturmak için aşağıdaki komutu çalıştırın. ContainerId olarak cont1 ve shipId olarak ship1 olan bir unsur oluşturacaktır. Yer değişiklik yapmak IoTKinesisDataPath-MetadataTable-SAMPLE parametresini CloudFormation yığın dağıtımından DynamoDB tablo çıkış parametresiyle beraber kullanın.
aws dynamodb put-item --table-name IoTKinesisDataPath-MetadataTable-SAMPLE --item '{"containerId":{"S":"cont1"},"shipId":{"S":"ship1"}}'
Gerçek dünyadaki bir senaryoda cihazlar, yükleri belirli bir MQTT mevzusuna yayınlar. Bu örnekte, IoT cihazları oluşturmak yerine MQTT mevzularına yükleri yayınlamak için AWS CLI’yı kullanacaksınız. Örnek veri yükü AWS IoT Core yayınlamak için bir terminalde aşağıdaki komutu çalıştırın. Komutun payload alanına dikkat edin, cihazın sağlamış olduğu tek veri dinamik verilerdir.
aws iot-data publish --topic "device/data/cont1" --payload '{"temperature":20,"humidity":80,"latitude":0,"longitude":0}' --cli-binary-format raw-in-base64-out
Şimdi AWS Management Console’dan Amazon S3’e gidin ve CloudFormation yığınıyla oluşturulan klasörü seçin. Bu pakette aygıt verileri klasörünü görmelisiniz. Firehose dağıtım akışı için ayarlanan ara belleğe alma yapılandırması sebebiyle verilerin görünmesi 1 dakika kadar sürebilir. Device-data/YYYY/MM/gg/HH klasörüne giderseniz bir nesnenin oluşturulduğunu görmüş olacaksınız. Devam edin ve bu dosyayı açın. Dosyanın içinde ne olduğunun zenginleştirilmiş shipId alanına haiz veri yükü bulunduğunu görmüş olacaksınız.
{“ısı”: 20, “rutubet”: 80, “enlem”: 0, “boylam”: 0, “containerId”: “cont1”, “shipId”: “ship1”}
Mesele yok etme
Sistemde arıza olması durumunda probleminin kaynağını çözümleme etmek için aşağıdaki kaynaklar yararlı olabilir.
AWS IoT Core Rules Engine’i seyretmek için AWS IoT Core günlük kaydını etkinleştirmeniz gerekir. Bu, AWS IoT Core’da meydana gelen vakalar hakkında detaylı informasyon verecektir.
AWS Lambda, Amazon CloudWatch kullanılarak izlenebilir. Örnek CloudFormation şablonu, Lambda işlevi günlüğüne yönelik bir günlük grubu oluşturmak için lüzumlu izinlere haizdir.
Arıza durumunda Kinesis Data Firehose, AWS IoT Rules Engine eyleminde cihaz-veri öneki altında bir işleme başarısız klasörü oluşturacak, Lambda işlevini dönüştürecek yada Amazon S3 paketini oluşturacaktır. Arızanın ayrıntıları json nesneleri olarak okunabilir. Bu belgelerde daha çok informasyon bulabilirsiniz.
Temizlemek
Oluşturulan kaynakları temizlemek için ilk olarak Amazon S3 klasörünü boşaltın. değiştirerek aşağıdaki komutu çalıştırın. kova adı CloudFormation yığını tarafınca dağıtılan paketin adını içeren parametre. Mühim: Bu komut, paket içindeki tüm verileri geri döndürülemez şekilde siler.
aws s3 rm s3://bucket-name --recursive
Hemen sonra aşağıdaki komutu bir terminalde çalıştırarak CloudFormation yığınını silebilirsiniz.
aws cloudformation delete-stack --stack-name IoTKinesisDataPath
Çözüm
Bu blogda, AWS IoT Rules Engine ve Amazon Kinesis Data Firehose dağıtım akışını kullanarak IoT veri yüklerini meta verilerle zenginleştirmeye ve neredeyse gerçek zamanlı olarak bir veri gölünde uygun maliyetli bir halde depolamaya yönelik ortak bir modeli öğrendiniz. Tavsiye edilen çözüm ve CloudFormation şablonu, ölçeklenebilir bir IoT veri alma mimarisi için temel olarak kullanılabilir.
AWS IoT Core Rules Engine ve Amazon Kinesis Data Firehose hakkında daha çok informasyon edinebilirsiniz. AWS IoT Rules Engine’de MQTT mevzularını kullanmaya yönelik en iyi uygulamalar, mevzu yapılarınızı tanımlamanızda size yol gösterecektir.
Source: aws.amazon.com