ItemPipeline

After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially.

Each item pipeline component (sometimes referred as just “Item Pipeline”) is a Python class that implements a simple method. They receive an item and perform an action over it, also deciding if the item should continue through the pipeline or be dropped and no longer processed.

SpiderでItemがスクレイプされた後、ItemPipelineに送られ、処理される。

Typical uses of item pipelines are:

  • cleansing HTML data
  • validating scraped data (checking that the items contain certain fields)
  • checking for duplicates (and dropping them)
  • storing the scraped item in a database

典型的なItemPipelineの例は

  • HTMLのクレンジング
  • バリデーション
  • 重複チェック
  • データベースへの保存

取得したデータの保存に限らず、クレンジング/バリデーション/重複チェックといったデータのチェック整形もパイプラインが担う。

Activating an Item Pipeline component

To activate an Item Pipeline component you must add its class to the ITEM_PIPELINES setting, like in the following example:

1
2
3
4
>ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
>.}

The integer values you assign to classes in this setting determine the order in which they run: items go through from lower valued to higher valued classes. It’s customary to define these numbers in the 0-1000 range.

settings.pyのITEM_PIPELINESで有効化。0~1000の値で実行順序を制御している。

MongoDBの例

公式サイトのpymongoを使いMongoDBへ追加するサンプル

__init__from_crawler()はパイプライン自体を生成し、DBに関する設定値を読み取っている。
crawler.settingsからsettings.pyで設定したパラメーターを取得することができる。

open_spider()close_spider()はDBへの接続/切断処理を行い、process_item()で生成されたitemを追加していく。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymongo

class MongoPipeline:

collection_name = 'scrapy_items'

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item

DynamoDBの例

Scrapy公式ではないが、GitHubでscrapy-dynamodbというScrapy向けのItemPipelineが公開されている。
基本的な記述は公式のMongoDBの例と同じだが、余剰なコードが多い。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import datetime
import boto3


def default_encoder(value):
if isinstance(value, datetime.datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, datetime.date):
return value.strftime('%Y-%m-%d')
elif isinstance(value, datetime.time):
return value.strftime('%H:%M:%S')
else:
return value


class DynamoDbPipeline(object):

def __init__(self, aws_access_key_id, aws_secret_access_key, region_name,
table_name, encoder=default_encoder):
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.region_name = region_name
self.table_name = table_name
self.encoder = encoder
self.table = None

@classmethod
def from_crawler(cls, crawler):
aws_access_key_id = crawler.settings['AWS_ACCESS_KEY_ID']
aws_secret_access_key = crawler.settings['AWS_SECRET_ACCESS_KEY']
region_name = crawler.settings['DYNAMODB_PIPELINE_REGION_NAME']
table_name = crawler.settings['DYNAMODB_PIPELINE_TABLE_NAME']
return cls(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
table_name=table_name
)

def open_spider(self, spider):
db = boto3.resource(
'dynamodb',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.region_name,
)
self.table = db.Table(self.table_name) # pylint: disable=no-member

def close_spider(self, spider):
self.table = None

def process_item(self, item, spider):
self.table.put_item(
TableName=self.table_name,
Item={k: self.encoder(v) for k, v in item.items()},
)
return item

コメント・シェア

DynamoDBでテーブルを作る

 
カテゴリー AWS Database   タグ

テーブル(暗黙的なインデックス)

PrimaryKey

DynamoDBのプライマリキーはパーティションキーのみのものとソートキーを組み合わせた複合キーがある。

PartitionKey (HashKey)

Amazon DynamoDB はデータをパーティションに保存します。パーティションは、ソリッドステートドライブ (SSD) によってバックアップされ、AWS リージョン内の複数のアベイラビリティーゾーン間で自動的にレプリケートされる、テーブル用のストレージの割り当てです。
一般的に言えば、テーブルとそのセカンダリインデックスの論理的なすべてのパーティションキー全体でアクティビティが均一になるようにアプリケーションを設計する必要があります。

パーティションキーのみのプライマリキーの場合、プライマリキーで一意にレコードを特定する。

SortKey (RangeKey)

Amazon DynamoDB テーブルで、テーブルの各項目を一意に識別する主キーは、パーティションキーだけでなくソートキーから構成されている場合があります。

設計が優れたソートキーには、2 つの主な利点があります。

関連情報を 1 つの場所にまとめて、効率的にクエリを実行することができます。ソートキーを慎重に設計することで、begins_with、between、>、< などの演算子による範囲のクエリを使用して、一般的に必要な関連項目のグループを検索することができます。

複合ソートキーを作成すれば、データの階層的 (1 対多) な関係を定義して、任意の階層レベルでクエリを実行することができます。

パーティションキー+ソートキーのプライマリキーの場合、この組合せで一意にレコードを特定する。
また、ソートキーにより範囲検索が可能になる。

インデックス

LocalSecondaryIndexes (LSI)

local secondary index は特定のパーティションキー値の代替ソートキーを維持します。また local secondary index には、ベーステーブルの一部またはすべての属性のコピーが含まれます。テーブルを作成する際に、local secondary index に射影する属性を指定します。local secondary indexのデータは、ベーステーブルと同じパーティションキーと、異なるソートキーで構成されます。これにより、この異なるディメンションにわたってデータ項目に効率的にアクセスできます。クエリまたはスキャンの柔軟性を向上するために、テーブルごとに最大 5 つのlocal secondary indexを作成できます。

テーブルとは異なるソートキーを持つインデックス。最大5個まで作成できる。

local secondary index は、すべて次の条件を満たす必要があります。
パーティションキーはそのベーステーブルのパーティションキーと同じである。
ソートキーは完全に 1 つのスカラー属性で構成されている。
ベーステーブルのソートキーがインデックスに射影され、非キー属性として機能する。

パーティションキーは同じで、テーブルのソートキーは非キー属性として機能する。

ローカルセカンダリインデックスがあるテーブルには、パーティションキーの値ごとに 10 GB のサイズ制限があります。ローカルセカンダリインデックスがあるテーブルには、1 つのパーティションキー値の合計サイズが 10 GB を超えない限り、任意の数の項目を格納できます。

GlobalSecondaryIndexes (GSI)

非キー属性に対するクエリの速度を上げるために、グローバルセカンダリインデックス を作成できます。グローバルセカンダリインデックスには、ベーステーブルからの属性の一部が格納されますが、テーブルのプライマリキーとは異なるプライマリキーによって構成されます。インデックスキーは、テーブルからのキー属性を持つ必要がありません。また、テーブルと同じキースキーマを使用する必要もありません。

テーブルとは異なるパーティションキーを持つインデックス。

すべてのグローバルセカンダリインデックスには、パーティションキーが必要で、オプションのソートキーを指定できます。インデックスキースキーマは、テーブルスキーマとは異なるものにすることができます。

テーブルスキーマと異なるパーティションキーとソートキーを利用することができる。

DynamoDB テーブルでは、各キー値は一意である必要があります。ただし、グローバルセカンダリインデックス のキー値は一意である必要はありません。

グローバルセカンダリーインデックスのキーは一意でなくていい。

簡単なテーブルを作成する

dynamodb.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
AWSTemplateFormatVersion: "2010-09-09"
Description: DynamoDB

Metadata:
# ------------------------------------------------------------ #
# Input Parameters
# ------------------------------------------------------------ #
"AWS::CloudFormation::Interface":
ParameterGroups:
- Label:
default: "Dynamo DB"
Parameters:
- TableName

ParameterLabels:
TableName:
default: "myTable"

Parameters:
TableName:
Type: String

Resources:
# ------------------------------------------------------------ #
# DynamoDB
# ------------------------------------------------------------ #
Resources:
DDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref TableName
AttributeDefinitions:
-
AttributeName: "ArtistId"
AttributeType: "S"
-
AttributeName: "Concert"
AttributeType: "S"
-
AttributeName: "TicketSales"
AttributeType: "S"
KeySchema:
-
AttributeName: "ArtistId"
KeyType: "HASH"
-
AttributeName: "Concert"
KeyType: "RANGE"
GlobalSecondaryIndexes:
-
IndexName: "GSI"
KeySchema:
-
AttributeName: "TicketSales"
KeyType: "HASH"
Projection:
ProjectionType: "KEYS_ONLY"
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5

# ------------------------------------------------------------ #
# Output Parameters
# ------------------------------------------------------------ #
Outputs:
DynamoDBTable:
Value: !Ref DDBTable

parameters.json

ホストするドメイン名

1
2
3
4
5
6
[
{
"ParameterKey": "TableName",
"ParameterValue": "mytesttable"
}
]

CloudFormationでテーブルを作成する

1
2
3
4
$ aws cloudformation create-stack --stack-name tutorial-dynamodb --template-body "file://./dynamodb.yml" --parameters "file://./parameters.json"
{
"StackId": "arn:aws:cloudformation:ap-northeast-1:XXXXXXXXXXX:stack/tutorial-dynamodb/XXXXXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}

作成されたテーブル

AWS Consoleのダッシュボードでキャパシティを確認できる。
今回の例ではテーブルとインデックスで、読み込み書き込みそれぞれ5ユニット指定しているため、10になっている。

DynamoDB Table width=640

テーブルには作成されたテーブルが表示されている。

DynamoDB Table width=640

項目でテーブルの内容を確認できる。今回の例ではインデックス以外の属性が見えている。

DynamoDB Table width=640

キャパシティータブでは設定したキャパシティの内訳を見ることができる。

DynamoDB Table width=640

作成したインデックス。

DynamoDB Table width=640

Table

AttributeDefinitions

AttributeType
属性のデータ型。
S - 属性は文字列型
N - 属性は数値型
B - 属性はバイナリ型

KeySchema

KeyType
キー属性が担うロール:
HASH - パーティションキー
RANGE - ソートキー

LocalSecondaryIndex / GlobalSecondaryIndex

コメント・シェア

DynamoDBのキャパシティモード

 
カテゴリー AWS Database   タグ

キャパシティモード

Amazon DynamoDB には、テーブルで読み込みおよび書き込みを処理するための読み込み/書き込みキャパシティーモードが 2 つあります。

  • オンデマンド
  • プロビジョニング済み (デフォルト、無料利用枠の対象)

無料枠

25 GB のストレージ
25 個のプロビジョニングされた書き込みキャパシティーユニット (WCU)
25 個のプロビジョニングされた読み込みキャパシティーユニット (RCU)
1 か月あたり最大 2 億リクエストの処理が十分に可能。

料金

DynamoDB では、DynamoDB テーブル内のデータの読み取り、書き込み、保存に加え、お客様が有効化したオプション機能が課金の対象となります。DynamoDB には「オンデマンド」と「プロビジョニング済み」という 2 種類のキャパシティーモードがあり、それぞれのモードにおけるテーブルの読み書き処理について別個の請求オプションがあります。

オンデマンド(on-demand capacity mode)とプロビジョニング済(provisioned capacity mode)で料金が異なる。

オンデマンドキャパシティーモードを利用している場合、料金は、アプリケーションがテーブルにおいて実行したデータの読み込み/書き込みリクエストに対して発生します。ワークロードの拡大や縮小は DynamoDB によってその場で対応されるため、お客様はアプリケーションの読み込み/書き込みスループットの予測を指定する必要がありません。

オンデマンドは従量課金でリクエストに対する課金。

プロビジョニングされたキャパシティーのモードでは、アプリケーションに必要な 1 秒あたりのデータ読み込みと書き込みの回数を指定します。Auto Scaling を使用すれば、指定した利用率に応じてテーブルのキャパシティーが自動的に調整されるので、アプリケーションのパフォーマンスを確保しつつコストを削減できます。

プロビジョニングは事前にキャパシティユニットとしてリソースを定義する。

on-demand capacity mod

DynamoDB テーブルにオンデマンドキャパシティーモードを選択している場合、アプリケーションが実行する読み込みと書き込みに対してのみ課金されます。テーブルのスループット容量を管理することなく、必要に応じて API コールを実行できます。DynamoDB では、ワークロードで一貫性と低レイテンシーを実現できるよう、ハードウェアリソースが自動的に管理されます。書き込み (1 KB まで) については、1 回につき書き込みリクエストが 1 単位発生し、トランザクション書き込みでは 1 回につき書き込みリクエストが 2 単位発生します。読み込みについては、強力な整合性のある読み込み (4 KB まで) 1 回につき 1 単位、トランザクション読み込み 1 回につき 2 単位、結果整合性のある読み込み 1 回につき 0.5 単位の読み込みリクエストが発生します。

要求単位(request)で料金が決まる。

  • 標準(standard)
    • 1KB * 1回 / 1単位
  • トランザクション(transactional)
    • 1KB * 1回 / 2単位

読み込みの場合は、整合性によって料金が異なる。

  • 強力な整合性(strongly consistent)
    • 4KB*1回 = 1単位
  • 結果整合性(eventually consistent)
    • 4KB*1回 = 0.5単位
  • トランザクション(transactional)
    • 4KB*1回 = 2単位

リージョン:
料金タイプ 料金
書き込み要求単位 書き込み要求ユニット 100 万あたり 1.4269USD
読み出し要求単位 書き込み要求ユニット 100 万あたり 0.285USD

書き込み要求は高い。2020年5月13日時点で、AWSのサイトの日本語表記が間違っているが、読み出し要求単位の行は書き込みではなく読み込み(英語表示はreadになっている)。

provisioned capacity mode

読み込みキャパシティーユニット (RCU): テーブルからデータを読み込むための各 API コールを読み込み要求といいます。読み込み要求は、強力な整合性のある読み込み、結果整合性のある読み込み、またはトランザクション読み込みとなります。項目のサイズが 4 KB までなら、RCU 1 個で、強力な整合性のある読み込み要求を 1 秒あたり 1 回実行できます。項目が 4 KB より大きい場合、追加の RCU が必要です。項目のサイズが 4 KB までなら、RCU 1 個で、結果整合性のある読み込み要求を 1 秒あたり 2 回実行できます。トランザクション読み込み要求では、4 KB までの項目を 1 秒あたり 1 回読み込むのに RCU 2 個が必要です。例えば、8 KB の項目であれば、強力な整合性のある読み込みには RCU 2 個、結果整合性のある読み込みには RCU 1 個、トランザクション読み込みには RCU 4 個がそれぞれ必要になります。

  • 強力な整合性(strongly consistent)
    • 4KB*1回/1sec = 1RCU
  • 結果整合性(eventually consistent)
    • 4KB*2回/1sec = 1RCU
  • トランザクション(transactional)
    • 4KB*1回/1sec = 2RCU

書き込みキャパシティーユニット (WCU): テーブルにデータを書き込むための各 API コールを書き込み要求といいます。項目のサイズが 1 KB までなら、WCU 1 個で、標準の書き込み要求を 1 秒あたり 1 回実行できます。項目が 1 KB より大きい場合、追加の WCU が必要です。トランザクション書き込み要求では、1 KB までの項目を 1 秒あたり 1 回書き込むのに WCU 2 個が必要です。たとえば、1 KB の項目の標準書き込み要求には WCU 1 個、3 KB の項目の標準書き込み要求には WCU 3 個、3 KB の項目のトランザクション書き込み要求には WCU 6 個が必要になります。

  • 標準(standard)
    • 1KB*1回/1sec = 1WCU
  • トランザクション(transactional)
    • 1KB*1回/1sec = 2WCU

リージョン:
プロビジョニングするスループットタイプ 時間あたりの料金
書き込みキャパシティーユニット (WCU) 0.000742USD/WCU
読み込みキャパシティーユニット (RCU) 0.0001484USD/RCU

割当てたキャパシティユニットを越えた場合

ProvisionedThroughputExceededException
メッセージ : 1 つのテーブルまたは 1 つ以上のグローバルセカンダリインデックスのプロビジョンドスループットが許容されている最大値を超えました。プロビジョンドスループットと消費スループットのパフォーマンスメトリクスを表示するには、Amazon CloudWatch コンソールを参照してください。

ProvisionedThroughputExceededExceptionがスローされる。

コメント・シェア

cron構文を使用したワークフローの実行

POSIX クーロン構文を使用して、特定の UTC 時間にワークフローを実行できるようスケジュール設定できます。 スケジュールしたワークフローは、デフォルトまたはベースブランチの直近のコミットで実行されます。 スケジュールされたワークフローを実行できる最短のインターバルは5分ごとです。

1
2
3
4
on:
schedule:
# * はYAMLに置ける特殊文字なので、この文字列は引用符で囲まなければならない
- cron: '*/15 * * * *'

よくあるcron設定。

1
2
3
4
5
6
7
8
9
┌───────────── 分 (0 - 59)
│ ┌───────────── 時間 (0 - 23)
│ │ ┌───────────── 日 (1 - 31)
│ │ │ ┌───────────── 月 (1 - 12 または JAN-DEC)
│ │ │ │ ┌───────────── 曜日 (0 - 6 または SUN-SAT)
│ │ │ │ │
│ │ │ │ │
│ │ │ │ │
* * * * *

注釈: GitHub Actions は、非標準的構文 (@yearly、@monthly、@weekly、@daily、@hourly、@reboot) をサポートしていません。

深夜2時にワークフローを実行する例

ワークフロー例

1
2
3
4
5
6
7
8
9
10
11
12
on:
schedule:
# At 02:00.(JST) → At 17:00.(UTC)
- cron: '0 17 * * *'

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest
timeout-minutes: 10

UTC時間に注意

GitHub ActionsでScheduleに使用できるcronはUTC時間で実行されるので、JST(+9:00)との差を考慮して設定が必要。

crontab guru を使うと、クーロン構文の生成および実行時間の確認に役立ちます。 また、クーロン構文の生成を支援するため、crontab guru のサンプルリストもあります。

GitHubのリファレンスで紹介されているcrontab.guruが使いやすい。

crontabguru width=640

実行時のタイムアウトを設定する

GitHub Actionsのプライベートリポジトリでの制限時間枠を浪費しないため、深夜に起動したジョブが終了しない場合のタイムアウトを設定する。

jobs..steps.timeout-minutes
プロセスがkillされるまでにステップが実行できる最大の分数。

jobs..timeout-minutes
GitHubで自動的にキャンセルされるまでジョブを実行する最長時間 (分)。 デフォルト: 360

デフォルトではjob全体でのtimeout-minutesは360分が設定されている。

Jobの実行結果をもとに適当な値を設定する。

GithubActions width=640

タイムアウトするとジョブはキャンセルされる

GithubActions width=640

コメント・シェア

GitHub Actionsとdocker-compose

GitHub Actionsではdockerhubのイメージを使ったワークフローが組めるが、docker-composeも使える。ローカル開発時はdocker-composeで制御することが多いので、そのまま活用できるのはシンプルで強力。

ワークフローの書き方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# This is a basic workflow to help you get started with Actions
name: docker-compose

# Controls when the action will run. Triggers the workflow on push or pull request
# events but only for the master branch
on:
push:
branches:
- master

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2

# Runs a single command using the runners shell
- name: compose-run
shell: bash
env:
ENVENV1: ${{ secrets.ENVENV1 }}
ENVENV2: ${{ secrets.ENVENV2 }}
run: |
docker-compose up -d
docker-compose exec -T app bash xxxxxxxx.sh

環境変数とシークレット

docker-composeのパラメーター定義

Dockerに渡すパラメーターは環境変数としてdocker-compose.ymlで次の定義をする。

1
2
3
4
environment:
PYTHONDONTWRITEBYTECODE: 1
ENVENV1: ${ENVENV1}
ENVENV2: ${ENVENV2}

ローカル開発中のパラメーター

docker-compose.ymlはGitリポジトリに含めるため、公開すべきでないパラメーターは.envファイルに記述する。
.envファイルは.gitignoreで管理対象外として定義しておく。

1
2
ENVENV1=xxxxxxxxxxxxxx
ENVENV2=yyyyyyyyyyyyyy

GitHub Actionsでのパラメーター

GitHub Actionsでは.envのかわりに、リポジトリの設定でSecretsを使用して定義する。
たとえばSecretsに定義したパラメーターをGitHub Actionsで参照するのは以下の定義になる。

1
2
3
env:
ENVENV1: ${{ secrets.ENVENV1 }}
ENVENV2: ${{ secrets.ENVENV2 }}

GitHub Secrets width=640

ワークフローでdocker-composeを使う

docker-composeの起動方法

そのままdocker-composeコマンドが使用できる。

docker-compose内で定義されたDBなどの依存するサービスはdocker-compose up -dで起動し、docker-compose exec -T app bash xxxxxx.shでメインのコードを実行する。

  • docker-compose up -dはサービスを起動して、コマンド自体は終了
  • docker-compose execでバックグラウンド起動したサービスにAttachし目的のコマンドを実行

the input device is not a TTY

docker-compose exec -T app bash xxxxxxxx.shの実行でエラーになった。

1
2
the input device is not a TTY
##[error]Process completed with exit code 1.

解決方法は2つある

  1. 環境変数COMPOSE_INTERACTIVE_NO_CLI=1を使用する
  2. docker-compose exec -Tで起動する

コメント・シェア

Scrapyでファイルをダウンロードして保存する

scrapyで複数ページを巡回はCrawlSpider、ファイルのダウンロードはFilesPipelineを使うと簡潔に記述できる。
FilesPipelineはデフォルトではSha1ハッシュをファイル名にする実装なので、カスタマイズが必要。
ソースコードは簡潔で読みやすいので継承してカスタマイズするのは容易。

CrawlSpider

要約すると、ポイントは以下

  • 巡回対象のページをrulesLinkExtractorで抽出
  • コールバックで抽出したページからアイテムを抽出

FilesPipeline

要約すると、ポイントは以下

  • settings.pyのFILES_STOREFILES_STOREによるダウンロード先ディレクトリを指定
  • settings.pyのITEM_PIPELINESFilesPipelineを有効化
  • 生成するアイテムにfile_urls属性を追加し、ダウンロードするファイルのURLsを指定
  • 生成するアイテムにダウンロード結果を保存するfiiles属性を追加する

Using the Files Pipeline

The typical workflow, when using the FilesPipeline goes like this:

In a Spider, you scrape an item and put the URLs of the desired into a file_urls field.

The item is returned from the spider and goes to the item pipeline.

When the item reaches the FilesPipeline, the URLs in the file_urls field are scheduled for download using the standard Scrapy scheduler and downloader (which means the scheduler and downloader middlewares are reused), but with a higher priority, processing them before other pages are scraped. The item remains “locked” at that particular pipeline stage until the files have finish downloading (or fail for some reason).

When the files are downloaded, another field (files) will be populated with the results. This field will contain a list of dicts with information about the downloaded files, such as the downloaded path, the original scraped url (taken from the file_urls field) , and the file checksum. The files in the list of the files field will retain the same order of the original file_urls field. If some file failed downloading, an error will be logged and the file won’t be present in the files field.

Spiderでスクレイピングし、目的のURLをfile_urlsにセットすると、SchedulerとDownloaderを使ってスケジューリングされるが、優先度が高く他のページをスクレイピングする前に処理される。ダウンロード結果はfilesに記録する。

Enabling your Media Pipeline

To enable your media pipeline you must first add it to your project ITEM_PIPELINES setting.

For Images Pipeline, use:

ITEM_PIPELINES = {‘scrapy.pipelines.images.ImagesPipeline’: 1}
For Files Pipeline, use:

ITEM_PIPELINES = {‘scrapy.pipelines.files.FilesPipeline’: 1}

ITEM_PIPELINESでscrapy.pipelines.files.FilesPipeline': 1を指定して有効化する。
画像ファイルのためのImagesPipelineもある。

Supported Storage - File system storage

The files are stored using a SHA1 hash of their URLs for the file names.

ファイル名はSHA1ハッシュを使用する

IPAの情報処理試験のページをサンプルにCrawlSpiderを試す

対象のページ構造

起点となるページは各年度の過去問ダウンロードページへのリンクになっている。

IPAのページ width=640

各ページは試験区分ごとに過去問のPDFへのリンクがある。

IPAのページ width=640

project

https://www.jitec.ipa.go.jp/1_04hanni_sukiru/_index_mondai.html以下のページを巡回してPDFをダウンロードするプロジェクトを作成する。
Spiderのスケルトンを作成する際に-t crawlを指定し、CrawlSpiderのスケルトンを作成する。

1
2
3
scrapy startproject <プロジェクト名>
cd <プロジェクト名>
scrapy genspider -t crawl ipa www.ipa.go.jp

spiders/ipa.py

rulesで各年度の過去問ダウンロードページを抽出し、各ページを解析してPDF単位でアイテム化する。
file_urlsは複数指定できるが、ここでは1ファイル毎で指定している。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from crawldownload.items import CrawldownloadItem

class IpaSpider(CrawlSpider):
name = 'ipa'
allowed_domains = ['ipa.go.jp']
start_urls = ['https://www.jitec.ipa.go.jp/1_04hanni_sukiru/_index_mondai.html']

rules = (
Rule(LinkExtractor(allow=r'1_04hanni_sukiru/mondai_kaitou'), callback='parse_item', follow=True),
)

def parse_item(self, response):
logger.info("{}".format(response.css('title::text').get()))

for main_area in response.css('#ipar_main'):
exam_seasons = main_area.css('h3').xpath('string()').extract()

season = 0
for exam_table in main_area.css('div.unit'):
exam_season = exam_seasons[season]
season+=1

# ページ内のPDFファイルのアイテムを生成
for exam_item in exam_table.css('tr'):
# リンクを含まないヘッダ部なので除く
if exam_item.css('a').get() is None:
continue

for exam_link in exam_item.css('a'):
exam_pdf = response.urljoin(exam_link.css('a::attr(href)').get())

item = CrawldownloadItem()
item['season'] = exam_season
item['title'] = exam_item.css('td p::text').getall()[1].strip()
item['file_title'] = exam_link.css('a::text').get()
item['file_urls'] = [ exam_pdf ]
yield item

items.py

files_urlsfiles属性がFilesPipelineで必要になる属性

1
2
3
4
5
6
7
8
import scrapy

class CrawldownloadItem(scrapy.Item):
season = scrapy.Field()
title = scrapy.Field()
file_title = scrapy.Field()
file_urls = scrapy.Field()
files = scrapy.Field()

pipelines.py

FilesPipelineはデフォルトでSHA1ハッシュのファイル名を使用するので、継承したクラスでfile_path()メソッドをオーバーライドする。
存在しないディレクトリも自動生成されるので、保存したいパスを生成して返せばいい。

1
2
3
4
5
6
7
8
9
10
11
12
from scrapy.pipelines.files import FilesPipeline

import os

class CrawldownloadPipeline(FilesPipeline):
def file_path(self, request, response=None, info=None):
file_paths = request.url.split("/")
file_paths.pop(0) # https:
file_paths.pop(0) #//
file_name = os.path.join(*file_paths)

return file_name
1
2
3
response.url="https://www.jitec.ipa.go.jp/1_04hanni_sukiru/mondai_kaitou_2019h31_2/2019r01a_sg_am_qs.pdf"
↓↓↓
file_name="www.jitec.ipa.go.jp/1_04hanni_sukiru/mondai_kaitou_2019h31_2/2019r01a_sg_am_qs.pdf"

setting.py

FilesPipelineを有効化する。

  • FILES_STOREでダウンロード先ディレクトリを指定
  • ITEM_PIPELINESFilesPipelineを有効化

デフォルト設定では多重度が高すぎるので、調整する。

  • 同時アクセスは1
  • ダウンロード間隔3秒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Obey robots.txt rules
#ROBOTSTXT_OBEY = True
ROBOTSTXT_OBEY = False

# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32
CONCURRENT_REQUESTS = 1

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
#DOWNLOAD_DELAY = 3
DOWNLOAD_DELAY = 3

…略…

FILES_STORE = 'download'

ITEM_PIPELINES = {
#'scrapy.pipelines.files.FilesPipeline': 1,
'crawldownload.pipelines.CrawldownloadPipeline': 1,
}

コメント・シェア

Scrapyのcrawlでコマンドライン引数を処理する

 
カテゴリー Python   タグ

クローラーへのコマンドラインオプションの渡し方

scrapy crawl myspider -a category=electronicsのように-aオプションで渡す。

コンストラクタを実装する

1
2
3
4
5
6
7
8
9
10
11
Spiders can access arguments in their __init__ methods:

import scrapy

class MySpider(scrapy.Spider):
name = 'myspider'

def __init__(self, category=None, *args, **kwargs):
super(MySpider, self).__init__(*args, **kwargs)
self.start_urls = ['http://www.example.com/categories/%s' % category]
# ...

デフォルトコンストラクタを使用する

The default init method will take any spider arguments and copy them to the spider as attributes. The above example can also be written as follows:

デフォルトでは属性値として設定される。

1
2
3
4
5
6
7
import scrapy

class MySpider(scrapy.Spider):
name = 'myspider'

def start_requests(self):
yield scrapy.Request('http://www.example.com/categories/%s' % self.category)

コメント・シェア

ScrapyとSplashでのセッションハンドリング

 
カテゴリー Lua Python   タグ

Splashのセッションハンドリング

Splashのみで利用する場合はSelenium同様、内部的に動作するHeadlessブラウザ(Chromium)がセッションハンドリングを行うため、同一のLuaスクリプト内で記述する範囲では意識しなくてもステートは維持されている。

ScrapyとSplashの間

SplashはScrapyからのリクエスト毎にステートレスなので、ScrapyとLuaスクリプトの間でセッションハンドリングが必要になる。
scrapy-splashに説明がある。

セッションハンドリング

Splash itself is stateless - each request starts from a clean state. In order to support sessions the following is required:

  1. client (Scrapy) must send current cookies to Splash;
  2. Splash script should make requests using these cookies and update them from HTTP response headers or JavaScript code;
  3. updated cookies should be sent back to the client;
  4. client should merge current cookies wiht the updated cookies.

For (2) and (3) Splash provides splash:get_cookies() and splash:init_cookies() methods which can be used in Splash Lua scripts.

Splashはステートレスなので、状態を維持するためのコーディングが必要。

  1. ScrapyからSplashにCookieを送らなくてはならない
  2. SplashスクリプトはCookieを使って操作し、Cookieをアップデートする
  3. アップデートしたCookieをScrapyに返す
  4. Scrapyは受け取ったCookieをマージする

scrapy-splash provides helpers for (1) and (4): to send current cookies in ‘cookies’ field and merge cookies back from ‘cookies’ response field set request.meta[‘splash’][‘session_id’] to the session identifier. If you only want a single session use the same session_id for all request; any value like ‘1’ or ‘foo’ is fine.

scrapy-splashが自動的にCookie情報をセッション識別子としてrequest.meta['splash']['session_id']にマージする。

For scrapy-splash session handling to work you must use /execute endpoint and a Lua script which accepts ‘cookies’ argument and returns ‘cookies’ field in the result:

このセッションハンドリングを有効にするには/executeエンドポイントを使用し、cookiesパラメーターを使用する処理をLuaスクリプトで実装する必要がある。

1
2
3
4
5
6
7
8
9
10
function main(splash)
splash:init_cookies(splash.args.cookies)

-- ... your script

return {
cookies = splash:get_cookies(),
-- ... other results, e.g. html
}
end

SplashRequest sets session_id automatically for /execute endpoint, i.e. cookie handling is enabled by default if you use SplashRequest, /execute endpoint and a compatible Lua rendering script.

SplashRequestで/executeエンドポイントを使い、適切なLuaスクリプトを記述すれば、セッションハンドリングを実装することができる。

Splash経由でのresponseの構造

All these responses set response.url to the URL of the original request (i.e. to the URL of a website you want to render), not to the URL of the requested Splash endpoint. “True” URL is still available as response.real_url.
plashJsonResponse provide extra features:

  • response.data attribute contains response data decoded from JSON; you can access it like response.data[‘html’].
  • If Splash session handling is configured, you can access current cookies as response.cookiejar; it is a CookieJar instance.
  • If Scrapy-Splash response magic is enabled in request (default), several response attributes (headers, body, url, status code) are set automatically from original response body:
    • response.headers are filled from ‘headers’ keys;
    • response.url is set to the value of ‘url’ key;
    • response.body is set to the value of ‘html’ key, or to base64-decoded value of ‘body’ key;
    • response.status is set from the value of ‘http_status’ key.
  • response.urlはレンダリングするページのURLが設定される
  • response.real_urlはSplashのURL(http://splash:8050/execute)となる
  • response.dataでSplashから返却したデータにアクセスできる
  • Cookieはresponse.cookiejarでアクセスすることができる。
  • Scrapy-Splash response magicで自動的にレンダリングしたページからの応答が設定される

セッションハンドリングのサンプルコード

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import scrapy
from scrapy_splash import SplashRequest

script = """
function main(splash)
splash:init_cookies(splash.args.cookies)
assert(splash:go{
splash.args.url,
headers=splash.args.headers,
http_method=splash.args.http_method,
body=splash.args.body,
})
assert(splash:wait(0.5))

local entries = splash:history()
local last_response = entries[#entries].response
return {
url = splash:url(),
headers = last_response.headers,
http_status = last_response.status,
cookies = splash:get_cookies(),
html = splash:html(),
}
end
"""

class MySpider(scrapy.Spider):


# ...
yield SplashRequest(url, self.parse_result,
endpoint='execute',
cache_args=['lua_source'],
args={'lua_source': script},
headers={'X-My-Header': 'value'},
)

def parse_result(self, response):
# here response.body contains result HTML;
# response.headers are filled with headers from last
# web page loaded to Splash;
# cookies from all responses and from JavaScript are collected
# and put into Set-Cookie response header, so that Scrapy
# can remember them.

リクエストで注目するポイント

重要なポイントは/executeエンドポイントを使用していること。
argsでLuaスクリプトやパラメーターをSplashに渡す。

1
2
3
4
5
6
yield SplashRequest(url, self.parse_result,
endpoint='execute',
cache_args=['lua_source'],
args={'lua_source': script},
headers={'X-My-Header': 'value'},
)

SplashRequestで渡したパラメーターを使用してCookieを初期化。

1
2
3
4
5
6
7
8
splash:init_cookies(splash.args.cookies)
assert(splash:go{
splash.args.url,
headers=splash.args.headers,
http_method=splash.args.http_method,
body=splash.args.body,
})
assert(splash:wait(0.5))

レスポンスで注目するポイント

最後のレスポンスのヘッダー情報やCookieを返却。

1
2
3
4
5
6
7
8
9
local entries = splash:history()
local last_response = entries[#entries].response
return {
url = splash:url(),
headers = last_response.headers,
http_status = last_response.status,
cookies = splash:get_cookies(),
html = splash:html(),
}

コメント・シェア



nullpo

めも


募集中


Japan