Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062

1063

1064

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1087

1088

1089

1090

1091

1092

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154

1155

1156

1157

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185

1186

1187

1188

1189

1190

1191

1192

1193

1194

1195

1196

1197

1198

1199

1200

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220

1221

1222

1223

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1249

1250

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288

1289

1290

1291

1292

1293

1294

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304

1305

1306

1307

1308

1309

1310

1311

1312

1313

1314

1315

1316

1317

1318

1319

1320

1321

1322

1323

1324

1325

1326

1327

1328

1329

1330

1331

1332

1333

1334

1335

1336

1337

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355

1356

1357

1358

1359

1360

1361

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1376

1377

1378

1379

1380

1381

1382

1383

1384

1385

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413

1414

1415

1416

1417

1418

1419

1420

1421

1422

1423

1424

1425

1426

1427

1428

1429

1430

1431

1432

1433

1434

1435

1436

1437

1438

1439

1440

1441

1442

1443

1444

1445

1446

1447

1448

1449

1450

1451

1452

1453

1454

1455

1456

1457

1458

1459

1460

1461

1462

1463

1464

1465

1466

1467

1468

1469

1470

1471

1472

1473

1474

1475

1476

1477

1478

1479

1480

1481

1482

1483

1484

1485

1486

1487

1488

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1502

1503

1504

1505

1506

1507

1508

1509

1510

1511

1512

1513

1514

1515

1516

1517

1518

1519

1520

1521

1522

1523

1524

1525

1526

1527

1528

1529

1530

1531

1532

1533

1534

1535

1536

1537

1538

1539

1540

1541

1542

1543

1544

1545

1546

1547

1548

1549

1550

1551

1552

1553

1554

1555

1556

1557

1558

1559

1560

1561

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580

1581

1582

1583

1584

1585

1586

1587

1588

1589

1590

1591

1592

1593

1594

1595

1596

1597

1598

1599

1600

1601

1602

1603

1604

1605

1606

1607

1608

1609

1610

1611

1612

1613

1614

1615

1616

1617

1618

1619

1620

1621

1622

1623

1624

1625

1626

1627

1628

1629

1630

1631

1632

1633

1634

1635

1636

1637

1638

1639

1640

1641

1642

1643

1644

1645

1646

1647

1648

1649

1650

1651

1652

1653

1654

1655

1656

1657

1658

1659

1660

1661

1662

1663

1664

1665

1666

1667

1668

1669

1670

1671

1672

1673

1674

1675

1676

1677

1678

1679

1680

1681

1682

1683

1684

1685

1686

1687

1688

1689

1690

1691

1692

1693

1694

1695

1696

1697

1698

1699

1700

1701

1702

1703

1704

1705

1706

1707

1708

1709

1710

1711

1712

1713

1714

1715

1716

1717

1718

1719

1720

1721

1722

1723

1724

1725

1726

1727

1728

1729

1730

1731

1732

1733

1734

1735

1736

1737

1738

1739

1740

1741

1742

1743

1744

1745

1746

1747

1748

1749

1750

1751

1752

1753

1754

1755

1756

1757

1758

1759

1760

1761

1762

1763

1764

1765

1766

1767

1768

1769

1770

1771

1772

1773

1774

1775

1776

1777

1778

1779

1780

1781

1782

1783

1784

1785

1786

1787

1788

1789

1790

1791

1792

1793

1794

1795

1796

1797

1798

1799

1800

1801

1802

1803

1804

1805

1806

1807

1808

1809

1810

1811

1812

1813

1814

1815

1816

1817

1818

1819

1820

1821

1822

1823

1824

1825

1826

1827

1828

1829

1830

1831

1832

1833

1834

1835

1836

1837

1838

1839

1840

1841

1842

1843

1844

1845

1846

1847

1848

1849

1850

1851

1852

1853

1854

1855

1856

1857

1858

1859

1860

1861

1862

1863

1864

1865

1866

1867

1868

1869

1870

1871

1872

1873

1874

1875

1876

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1910

1911

1912

1913

1914

1915

1916

1917

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1929

1930

1931

1932

1933

1934

1935

1936

1937

1938

1939

1940

1941

1942

1943

1944

1945

1946

1947

1948

1949

1950

1951

1952

1953

1954

1955

1956

1957

1958

1959

1960

1961

1962

1963

1964

1965

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976

1977

1978

1979

1980

1981

1982

1983

1984

1985

1986

1987

1988

1989

1990

1991

1992

1993

1994

1995

1996

1997

1998

1999

2000

2001

2002

2003

2004

2005

2006

2007

2008

2009

2010

2011

2012

2013

2014

2015

2016

2017

2018

2019

2020

2021

2022

2023

2024

2025

2026

2027

2028

2029

2030

2031

2032

2033

2034

2035

2036

2037

2038

2039

2040

2041

2042

2043

2044

2045

2046

2047

2048

2049

2050

2051

2052

2053

2054

2055

2056

2057

2058

2059

2060

2061

2062

2063

2064

2065

2066

2067

2068

2069

2070

2071

2072

2073

2074

2075

2076

2077

2078

2079

2080

2081

2082

2083

2084

2085

2086

2087

2088

2089

2090

2091

2092

2093

2094

2095

2096

2097

2098

2099

2100

2101

2102

2103

2104

2105

2106

2107

2108

2109

2110

2111

2112

2113

2114

2115

2116

2117

2118

2119

2120

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2135

2136

2137

2138

2139

2140

2141

2142

2143

2144

2145

2146

2147

2148

2149

2150

2151

2152

2153

2154

2155

2156

2157

2158

2159

2160

2161

2162

2163

2164

2165

2166

2167

2168

2169

2170

2171

2172

2173

2174

2175

2176

2177

2178

2179

2180

2181

2182

2183

2184

2185

2186

2187

2188

2189

2190

2191

2192

2193

2194

2195

2196

2197

2198

2199

2200

2201

2202

2203

2204

2205

2206

2207

2208

2209

2210

2211

2212

2213

2214

2215

2216

2217

2218

2219

2220

2221

2222

2223

2224

2225

2226

2227

2228

2229

2230

2231

2232

2233

2234

2235

2236

2237

2238

2239

2240

2241

2242

2243

2244

2245

2246

2247

2248

2249

2250

2251

2252

2253

2254

2255

2256

2257

2258

2259

2260

2261

2262

2263

2264

2265

2266

2267

2268

2269

2270

2271

2272

2273

2274

2275

2276

2277

2278

2279

2280

2281

2282

2283

2284

2285

2286

2287

2288

2289

2290

2291

2292

2293

2294

2295

2296

2297

2298

2299

2300

2301

2302

2303

2304

2305

2306

2307

2308

2309

2310

2311

2312

2313

2314

2315

2316

2317

2318

2319

2320

2321

2322

2323

2324

2325

2326

2327

2328

2329

2330

2331

2332

2333

2334

2335

2336

2337

2338

2339

2340

2341

2342

2343

2344

2345

2346

2347

2348

2349

2350

2351

2352

2353

2354

2355

2356

2357

2358

2359

2360

2361

2362

2363

2364

2365

2366

2367

2368

2369

2370

2371

2372

2373

2374

2375

2376

2377

2378

2379

2380

2381

2382

2383

2384

2385

2386

2387

2388

2389

2390

2391

2392

2393

2394

2395

2396

2397

2398

2399

2400

2401

2402

2403

2404

2405

2406

2407

2408

2409

2410

2411

2412

2413

2414

2415

2416

2417

2418

2419

2420

2421

2422

2423

2424

2425

2426

2427

2428

2429

2430

2431

2432

2433

2434

2435

2436

2437

2438

2439

2440

2441

2442

2443

2444

2445

2446

2447

2448

2449

2450

2451

2452

2453

2454

2455

2456

2457

2458

2459

2460

2461

2462

2463

2464

2465

2466

2467

2468

2469

2470

2471

2472

2473

2474

2475

2476

2477

2478

2479

2480

2481

2482

2483

2484

2485

2486

2487

2488

2489

2490

2491

2492

2493

2494

2495

2496

2497

2498

2499

2500

2501

2502

2503

2504

2505

2506

2507

2508

2509

2510

2511

2512

2513

2514

2515

2516

2517

2518

2519

2520

2521

2522

2523

2524

2525

2526

2527

2528

2529

2530

2531

2532

2533

2534

2535

2536

2537

2538

2539

2540

2541

2542

2543

2544

2545

2546

2547

2548

2549

2550

2551

2552

2553

2554

2555

2556

2557

2558

2559

2560

2561

2562

2563

2564

2565

2566

2567

2568

2569

2570

2571

2572

2573

2574

2575

2576

2577

2578

2579

2580

2581

2582

2583

2584

2585

2586

2587

2588

2589

2590

2591

2592

2593

2594

2595

2596

2597

2598

2599

2600

2601

2602

2603

2604

2605

2606

2607

2608

2609

2610

2611

2612

2613

2614

2615

2616

2617

2618

2619

2620

2621

2622

2623

2624

2625

2626

2627

2628

2629

2630

2631

2632

2633

2634

2635

2636

2637

2638

2639

2640

2641

2642

2643

2644

2645

2646

2647

2648

2649

2650

2651

2652

2653

2654

2655

2656

2657

2658

2659

2660

2661

2662

2663

2664

2665

2666

2667

2668

2669

2670

2671

2672

2673

2674

2675

2676

2677

2678

2679

2680

2681

2682

2683

2684

2685

2686

2687

2688

2689

2690

2691

2692

2693

2694

2695

2696

2697

2698

2699

2700

2701

2702

2703

2704

2705

2706

2707

2708

2709

2710

2711

2712

2713

2714

2715

2716

2717

2718

2719

2720

2721

2722

2723

2724

2725

2726

2727

2728

2729

2730

2731

2732

2733

2734

2735

2736

2737

2738

2739

2740

2741

2742

2743

2744

2745

2746

2747

2748

2749

2750

2751

2752

2753

2754

2755

2756

2757

2758

2759

2760

2761

2762

2763

2764

2765

2766

2767

2768

2769

2770

2771

2772

2773

2774

2775

2776

2777

2778

2779

2780

2781

2782

2783

2784

2785

2786

2787

2788

2789

2790

2791

2792

2793

2794

2795

2796

2797

2798

2799

2800

2801

2802

2803

2804

2805

2806

2807

2808

2809

2810

2811

2812

2813

2814

2815

2816

2817

2818

2819

2820

2821

2822

2823

2824

2825

2826

2827

2828

2829

2830

2831

2832

2833

2834

2835

2836

2837

2838

2839

2840

2841

2842

2843

2844

2845

2846

2847

2848

2849

2850

2851

2852

2853

2854

2855

2856

2857

2858

2859

2860

2861

2862

2863

2864

2865

2866

2867

2868

2869

2870

2871

2872

2873

2874

2875

2876

2877

2878

2879

2880

2881

2882

2883

2884

2885

2886

2887

2888

2889

2890

2891

2892

2893

2894

2895

2896

2897

2898

2899

2900

2901

2902

2903

2904

2905

2906

2907

2908

2909

2910

2911

2912

2913

2914

2915

2916

2917

2918

2919

2920

2921

2922

2923

2924

2925

2926

2927

2928

2929

2930

2931

2932

2933

2934

2935

2936

2937

2938

2939

2940

2941

2942

2943

2944

2945

2946

2947

2948

2949

2950

2951

2952

2953

2954

2955

2956

2957

2958

2959

2960

2961

2962

2963

2964

2965

2966

2967

2968

2969

2970

2971

2972

2973

2974

2975

2976

2977

2978

2979

2980

2981

2982

2983

2984

2985

2986

2987

2988

2989

2990

2991

2992

2993

2994

2995

2996

2997

2998

2999

3000

3001

3002

3003

3004

3005

3006

3007

3008

3009

3010

3011

3012

3013

3014

3015

3016

3017

3018

3019

3020

3021

3022

3023

3024

3025

3026

3027

3028

3029

3030

3031

3032

3033

3034

3035

3036

3037

3038

3039

3040

3041

3042

3043

3044

3045

3046

3047

3048

3049

3050

3051

3052

3053

3054

3055

3056

3057

3058

3059

3060

3061

3062

3063

3064

3065

3066

3067

3068

3069

3070

3071

3072

3073

3074

3075

3076

3077

3078

3079

3080

3081

3082

3083

3084

3085

3086

3087

3088

3089

3090

3091

3092

3093

3094

3095

3096

3097

3098

3099

3100

3101

3102

3103

3104

3105

3106

3107

3108

3109

3110

3111

3112

3113

3114

3115

3116

3117

3118

3119

3120

3121

3122

3123

3124

3125

3126

3127

3128

3129

3130

3131

3132

3133

3134

3135

3136

3137

3138

3139

3140

3141

3142

3143

3144

3145

3146

3147

3148

3149

3150

3151

3152

3153

3154

3155

3156

3157

3158

3159

3160

3161

3162

3163

3164

3165

3166

3167

3168

3169

3170

3171

3172

3173

3174

3175

3176

3177

3178

3179

3180

3181

3182

3183

3184

3185

3186

3187

3188

3189

3190

3191

3192

3193

3194

3195

3196

3197

3198

3199

3200

3201

3202

3203

3204

3205

3206

3207

3208

3209

3210

3211

3212

3213

3214

3215

3216

3217

3218

3219

3220

3221

3222

3223

3224

3225

3226

3227

3228

3229

3230

3231

3232

3233

3234

3235

3236

3237

3238

3239

3240

3241

3242

3243

3244

3245

3246

3247

3248

3249

3250

3251

3252

3253

3254

3255

3256

3257

3258

3259

3260

3261

3262

3263

3264

3265

3266

3267

3268

3269

3270

3271

3272

3273

3274

3275

3276

3277

3278

3279

3280

3281

3282

3283

3284

3285

3286

3287

3288

3289

3290

3291

3292

3293

3294

3295

3296

3297

3298

3299

3300

3301

3302

3303

3304

3305

3306

3307

3308

3309

3310

3311

3312

3313

3314

3315

3316

3317

3318

3319

3320

3321

3322

3323

3324

3325

3326

3327

3328

3329

3330

3331

3332

3333

3334

3335

3336

3337

3338

3339

3340

3341

3342

3343

3344

3345

3346

3347

3348

3349

3350

3351

3352

3353

3354

3355

3356

3357

3358

3359

3360

3361

3362

3363

3364

3365

3366

3367

3368

3369

3370

3371

3372

3373

3374

3375

3376

3377

3378

3379

3380

3381

3382

3383

3384

3385

3386

3387

3388

3389

3390

3391

3392

3393

3394

3395

3396

3397

3398

3399

3400

3401

3402

3403

3404

3405

3406

3407

3408

3409

3410

3411

3412

3413

3414

3415

3416

3417

3418

3419

3420

3421

3422

3423

3424

3425

3426

3427

3428

3429

3430

3431

3432

3433

3434

3435

3436

3437

3438

3439

3440

3441

3442

3443

3444

3445

3446

3447

3448

3449

3450

3451

3452

3453

3454

3455

3456

3457

3458

3459

3460

3461

3462

3463

3464

3465

3466

3467

3468

3469

3470

3471

3472

3473

3474

3475

3476

3477

3478

3479

3480

3481

3482

3483

3484

3485

3486

3487

3488

3489

3490

3491

3492

3493

3494

3495

3496

3497

3498

3499

3500

3501

3502

3503

3504

3505

3506

3507

3508

3509

3510

3511

3512

3513

3514

3515

3516

3517

3518

3519

3520

3521

3522

3523

3524

3525

3526

3527

3528

3529

3530

3531

3532

3533

3534

3535

3536

3537

3538

3539

3540

3541

3542

3543

3544

3545

3546

3547

3548

3549

3550

3551

3552

3553

3554

3555

3556

3557

3558

3559

3560

3561

3562

3563

3564

3565

3566

3567

3568

3569

3570

3571

3572

3573

3574

3575

3576

3577

3578

3579

3580

3581

3582

3583

3584

3585

3586

3587

3588

3589

3590

3591

3592

3593

3594

3595

3596

3597

3598

3599

3600

3601

3602

3603

3604

3605

3606

3607

3608

3609

3610

3611

3612

3613

3614

3615

3616

3617

3618

3619

3620

3621

3622

3623

3624

3625

3626

3627

3628

3629

3630

3631

3632

3633

3634

3635

3636

3637

3638

3639

3640

3641

3642

3643

3644

3645

3646

3647

3648

3649

3650

3651

3652

3653

3654

3655

3656

3657

3658

3659

3660

3661

3662

3663

3664

3665

3666

3667

3668

3669

3670

3671

3672

3673

3674

3675

3676

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

from future.standard_library import install_aliases 

 

install_aliases() 

from builtins import str 

from builtins import object, bytes 

import copy 

from collections import namedtuple 

from datetime import datetime, timedelta 

import dill 

import functools 

import getpass 

import imp 

import importlib 

import zipfile 

import jinja2 

import json 

import logging 

import os 

import pickle 

import re 

import signal 

import socket 

import sys 

import textwrap 

import traceback 

import warnings 

from urllib.parse import urlparse 

 

from sqlalchemy import ( 

Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, 

Index, Float) 

from sqlalchemy import case, func, or_, and_ 

from sqlalchemy.ext.declarative import declarative_base, declared_attr 

from sqlalchemy.dialects.mysql import LONGTEXT 

from sqlalchemy.orm import relationship, synonym 

 

from croniter import croniter 

import six 

 

from airflow import settings, utils 

from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor 

from airflow import configuration 

from airflow.exceptions import AirflowException, AirflowSkipException 

from airflow.utils.dates import cron_presets, date_range as utils_date_range 

from airflow.utils.db import provide_session 

from airflow.utils.decorators import apply_defaults 

from airflow.utils.email import send_email 

from airflow.utils.helpers import ( 

as_tuple, is_container, is_in, validate_key, pprinttable) 

from airflow.utils.logging import LoggingMixin 

from airflow.utils.state import State 

from airflow.utils.timeout import timeout 

from airflow.utils.trigger_rule import TriggerRule 

 

Base = declarative_base() 

ID_LEN = 250 

SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN') 

DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) 

XCOM_RETURN_KEY = 'return_value' 

 

Stats = settings.Stats 

 

ENCRYPTION_ON = False 

try: 

from cryptography.fernet import Fernet 

FERNET = Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8')) 

ENCRYPTION_ON = True 

except: 

pass 

 

if 'mysql' in SQL_ALCHEMY_CONN: 

LongText = LONGTEXT 

else: 

LongText = Text 

 

# used by DAG context_managers 

_CONTEXT_MANAGER_DAG = None 

 

 

def clear_task_instances(tis, session, activate_dag_runs=True): 

''' 

Clears a set of task instances, but makes sure the running ones 

get killed. 

''' 

job_ids = [] 

for ti in tis: 

if ti.state == State.RUNNING: 

if ti.job_id: 

ti.state = State.SHUTDOWN 

job_ids.append(ti.job_id) 

# todo: this creates an issue with the webui tests 

#elif ti.state != State.REMOVED: 

# ti.state = State.NONE 

# session.merge(ti) 

else: 

session.delete(ti) 

if job_ids: 

from airflow.jobs import BaseJob as BJ 

for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all(): 

job.state = State.SHUTDOWN 

if activate_dag_runs: 

execution_dates = {ti.execution_date for ti in tis} 

dag_ids = {ti.dag_id for ti in tis} 

drs = session.query(DagRun).filter( 

DagRun.dag_id.in_(dag_ids), 

DagRun.execution_date.in_(execution_dates), 

).all() 

for dr in drs: 

dr.state = State.RUNNING 

dr.start_date = datetime.now() 

 

 

class DagBag(LoggingMixin): 

""" 

A dagbag is a collection of dags, parsed out of a folder tree and has high 

level configuration settings, like what database to use as a backend and 

what executor to use to fire off tasks. This makes it easier to run 

distinct environments for say production and development, tests, or for 

different teams or security profiles. What would have been system level 

settings are now dagbag level so that one system can run multiple, 

independent settings sets. 

 

:param dag_folder: the folder to scan to find DAGs 

:type dag_folder: str 

:param executor: the executor to use when executing task instances 

in this DagBag 

:param include_examples: whether to include the examples that ship 

with airflow or not 

:type include_examples: bool 

:param sync_to_db: whether to sync the properties of the DAGs to 

the metadata DB while finding them, typically should be done 

by the scheduler job only 

:type sync_to_db: bool 

""" 

def __init__( 

self, 

dag_folder=None, 

executor=DEFAULT_EXECUTOR, 

include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'), 

sync_to_db=False): 

 

dag_folder = dag_folder or DAGS_FOLDER 

self.logger.info("Filling up the DagBag from {}".format(dag_folder)) 

self.dag_folder = dag_folder 

self.dags = {} 

self.sync_to_db = sync_to_db 

self.file_last_changed = {} 

self.executor = executor 

self.import_errors = {} 

if include_examples: 

example_dag_folder = os.path.join( 

os.path.dirname(__file__), 

'example_dags') 

self.collect_dags(example_dag_folder) 

self.collect_dags(dag_folder) 

if sync_to_db: 

self.deactivate_inactive_dags() 

 

def size(self): 

""" 

:return: the amount of dags contained in this dagbag 

""" 

return len(self.dags) 

 

def get_dag(self, dag_id): 

""" 

Gets the DAG out of the dictionary, and refreshes it if expired 

""" 

# If asking for a known subdag, we want to refresh the parent 

root_dag_id = dag_id 

if dag_id in self.dags: 

dag = self.dags[dag_id] 

if dag.is_subdag: 

root_dag_id = dag.parent_dag.dag_id 

 

# If the root_dag_id is absent or expired 

orm_dag = DagModel.get_current(root_dag_id) 

if orm_dag and ( 

root_dag_id not in self.dags or 

( 

orm_dag.last_expired and 

dag.last_loaded < orm_dag.last_expired 

) 

): 

# Reprocessing source file 

found_dags = self.process_file( 

filepath=orm_dag.fileloc, only_if_updated=False) 

 

if found_dags and dag_id in [dag.dag_id for dag in found_dags]: 

return self.dags[dag_id] 

elif dag_id in self.dags: 

del self.dags[dag_id] 

return self.dags.get(dag_id) 

 

def process_file(self, filepath, only_if_updated=True, safe_mode=True): 

""" 

Given a path to a python module or zip file, this method imports 

the module and look for dag objects within it. 

""" 

found_dags = [] 

 

# todo: raise exception? 

if not os.path.isfile(filepath): 

return found_dags 

 

try: 

# This failed before in what may have been a git sync 

# race condition 

dttm = datetime.fromtimestamp(os.path.getmtime(filepath)) 

if only_if_updated \ 

and filepath in self.file_last_changed \ 

and dttm == self.file_last_changed[filepath]: 

return found_dags 

 

except Exception as e: 

logging.exception(e) 

return found_dags 

 

mods = [] 

if not zipfile.is_zipfile(filepath): 

if safe_mode and os.path.isfile(filepath): 

with open(filepath, 'rb') as f: 

content = f.read() 

if not all([s in content for s in (b'DAG', b'airflow')]): 

return found_dags 

 

self.logger.debug("Importing {}".format(filepath)) 

org_mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) 

mod_name = 'unusual_prefix_' + org_mod_name 

 

if mod_name in sys.modules: 

del sys.modules[mod_name] 

 

with timeout(configuration.getint('core', "DAGBAG_IMPORT_TIMEOUT")): 

try: 

m = imp.load_source(mod_name, filepath) 

mods.append(m) 

except Exception as e: 

self.logger.exception("Failed to import: " + filepath) 

self.import_errors[filepath] = str(e) 

self.file_last_changed[filepath] = dttm 

 

else: 

zip_file = zipfile.ZipFile(filepath) 

for mod in zip_file.infolist(): 

head, tail = os.path.split(mod.filename) 

mod_name, ext = os.path.splitext(mod.filename) 

if not head and (ext == '.py' or ext == '.pyc'): 

if mod_name == '__init__': 

self.logger.warning("Found __init__.{0} at root of {1}". 

format(ext, filepath)) 

 

if safe_mode: 

with zip_file.open(mod.filename) as zf: 

self.logger.debug("Reading {} from {}". 

format(mod.filename, filepath)) 

content = zf.read() 

if not all([s in content for s in (b'DAG', b'airflow')]): 

# todo: create ignore list 

return found_dags 

 

if mod_name in sys.modules: 

del sys.modules[mod_name] 

 

try: 

sys.path.insert(0, filepath) 

m = importlib.import_module(mod_name) 

mods.append(m) 

except Exception as e: 

self.logger.exception("Failed to import: " + filepath) 

self.import_errors[filepath] = str(e) 

self.file_last_changed[filepath] = dttm 

 

for m in mods: 

for dag in list(m.__dict__.values()): 

if isinstance(dag, DAG): 

if not dag.full_filepath: 

dag.full_filepath = filepath 

dag.is_subdag = False 

dag.module_name = m.__name__ 

self.bag_dag(dag, parent_dag=dag, root_dag=dag) 

found_dags.append(dag) 

found_dags += dag.subdags 

 

self.file_last_changed[filepath] = dttm 

return found_dags 

 

@provide_session 

def kill_zombies(self, session): 

""" 

Fails tasks that haven't had a heartbeat in too long 

""" 

from airflow.jobs import LocalTaskJob as LJ 

self.logger.info("Finding 'running' jobs without a recent heartbeat") 

TI = TaskInstance 

secs = ( 

configuration.getint('scheduler', 'job_heartbeat_sec') * 3) + 120 

limit_dttm = datetime.now() - timedelta(seconds=secs) 

self.logger.info( 

"Failing jobs without heartbeat after {}".format(limit_dttm)) 

 

tis = ( 

session.query(TI) 

.join(LJ, TI.job_id == LJ.id) 

.filter(TI.state == State.RUNNING) 

.filter( 

or_( 

LJ.state != State.RUNNING, 

LJ.latest_heartbeat < limit_dttm, 

)) 

.all() 

) 

 

for ti in tis: 

if ti and ti.dag_id in self.dags: 

dag = self.dags[ti.dag_id] 

if ti.task_id in dag.task_ids: 

task = dag.get_task(ti.task_id) 

ti.task = task 

ti.handle_failure("{} killed as zombie".format(ti)) 

self.logger.info( 

'Marked zombie job {} as failed'.format(ti)) 

session.commit() 

 

def bag_dag(self, dag, parent_dag, root_dag): 

""" 

Adds the DAG into the bag, recurses into sub dags. 

""" 

self.dags[dag.dag_id] = dag 

dag.resolve_template_files() 

dag.last_loaded = datetime.now() 

 

for task in dag.tasks: 

settings.policy(task) 

 

if self.sync_to_db: 

session = settings.Session() 

orm_dag = session.query( 

DagModel).filter(DagModel.dag_id == dag.dag_id).first() 

if not orm_dag: 

orm_dag = DagModel(dag_id=dag.dag_id) 

orm_dag.fileloc = root_dag.full_filepath 

orm_dag.is_subdag = dag.is_subdag 

orm_dag.owners = root_dag.owner 

orm_dag.is_active = True 

session.merge(orm_dag) 

session.commit() 

session.close() 

 

for subdag in dag.subdags: 

subdag.full_filepath = dag.full_filepath 

subdag.parent_dag = dag 

subdag.fileloc = root_dag.full_filepath 

subdag.is_subdag = True 

self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) 

self.logger.debug('Loaded DAG {dag}'.format(**locals())) 

 

def collect_dags( 

self, 

dag_folder=None, 

only_if_updated=True): 

""" 

Given a file path or a folder, this method looks for python modules, 

imports them and adds them to the dagbag collection. 

 

Note that if a .airflowignore file is found while processing, 

the directory, it will behaves much like a .gitignore does, 

ignoring files that match any of the regex patterns specified 

in the file. 

""" 

start_dttm = datetime.now() 

dag_folder = dag_folder or self.dag_folder 

 

# Used to store stats around DagBag processing 

stats = [] 

FileLoadStat = namedtuple( 

'FileLoadStat', "file duration dag_num task_num dags") 

if os.path.isfile(dag_folder): 

self.process_file(dag_folder, only_if_updated=only_if_updated) 

elif os.path.isdir(dag_folder): 

patterns = [] 

for root, dirs, files in os.walk(dag_folder, followlinks=True): 

ignore_file = [f for f in files if f == '.airflowignore'] 

if ignore_file: 

f = open(os.path.join(root, ignore_file[0]), 'r') 

patterns += [p for p in f.read().split('\n') if p] 

f.close() 

for f in files: 

try: 

filepath = os.path.join(root, f) 

if not os.path.isfile(filepath): 

continue 

mod_name, file_ext = os.path.splitext( 

os.path.split(filepath)[-1]) 

if file_ext != '.py' and not zipfile.is_zipfile(filepath): 

continue 

if not any( 

[re.findall(p, filepath) for p in patterns]): 

ts = datetime.now() 

found_dags = self.process_file( 

filepath, only_if_updated=only_if_updated) 

 

td = datetime.now() - ts 

td = td.total_seconds() + ( 

float(td.microseconds) / 1000000) 

stats.append(FileLoadStat( 

filepath.replace(dag_folder, ''), 

td, 

len(found_dags), 

sum([len(dag.tasks) for dag in found_dags]), 

str([dag.dag_id for dag in found_dags]), 

)) 

except Exception as e: 

logging.warning(e) 

Stats.gauge( 

'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1) 

Stats.gauge( 

'dagbag_size', len(self.dags), 1) 

Stats.gauge( 

'dagbag_import_errors', len(self.import_errors), 1) 

self.dagbag_stats = sorted( 

stats, key=lambda x: x.duration, reverse=True) 

 

def dagbag_report(self): 

"""Prints a report around DagBag loading stats""" 

report = textwrap.dedent("""\n 

------------------------------------------------------------------- 

DagBag loading stats for {dag_folder} 

------------------------------------------------------------------- 

Number of DAGs: {dag_num} 

Total task number: {task_num} 

DagBag parsing time: {duration} 

{table} 

""") 

stats = self.dagbag_stats 

return report.format( 

dag_folder=self.dag_folder, 

duration=sum([o.duration for o in stats]), 

dag_num=sum([o.dag_num for o in stats]), 

task_num=sum([o.dag_num for o in stats]), 

table=pprinttable(stats), 

) 

 

def deactivate_inactive_dags(self): 

active_dag_ids = [dag.dag_id for dag in list(self.dags.values())] 

session = settings.Session() 

for dag in session.query( 

DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): 

dag.is_active = False 

session.merge(dag) 

session.commit() 

session.close() 

 

def paused_dags(self): 

session = settings.Session() 

dag_ids = [dp.dag_id for dp in session.query(DagModel).filter( 

DagModel.is_paused == True)] 

session.commit() 

session.close() 

return dag_ids 

 

 

class User(Base): 

__tablename__ = "users" 

 

id = Column(Integer, primary_key=True) 

username = Column(String(ID_LEN), unique=True) 

email = Column(String(500)) 

superuser = False 

 

def __repr__(self): 

return self.username 

 

def get_id(self): 

return str(self.id) 

 

def is_superuser(self): 

return self.superuser 

 

 

class Connection(Base): 

""" 

Placeholder to store information about different database instances 

connection information. The idea here is that scripts use references to 

database instances (conn_id) instead of hard coding hostname, logins and 

passwords when using operators or hooks. 

""" 

__tablename__ = "connection" 

 

id = Column(Integer(), primary_key=True) 

conn_id = Column(String(ID_LEN)) 

conn_type = Column(String(500)) 

host = Column(String(500)) 

schema = Column(String(500)) 

login = Column(String(500)) 

_password = Column('password', String(5000)) 

port = Column(Integer()) 

is_encrypted = Column(Boolean, unique=False, default=False) 

is_extra_encrypted = Column(Boolean, unique=False, default=False) 

_extra = Column('extra', String(5000)) 

 

def __init__( 

self, conn_id=None, conn_type=None, 

host=None, login=None, password=None, 

schema=None, port=None, extra=None, 

uri=None): 

self.conn_id = conn_id 

if uri: 

self.parse_from_uri(uri) 

else: 

self.conn_type = conn_type 

self.host = host 

self.login = login 

self.password = password 

self.schema = schema 

self.port = port 

self.extra = extra 

 

def parse_from_uri(self, uri): 

temp_uri = urlparse(uri) 

hostname = temp_uri.hostname or '' 

if '%2f' in hostname: 

hostname = hostname.replace('%2f', '/').replace('%2F', '/') 

conn_type = temp_uri.scheme 

if conn_type == 'postgresql': 

conn_type = 'postgres' 

self.conn_type = conn_type 

self.host = hostname 

self.schema = temp_uri.path[1:] 

self.login = temp_uri.username 

self.password = temp_uri.password 

self.port = temp_uri.port 

 

def get_password(self): 

if self._password and self.is_encrypted: 

if not ENCRYPTION_ON: 

raise AirflowException( 

"Can't decrypt, configuration is missing") 

return FERNET.decrypt(bytes(self._password, 'utf-8')).decode() 

else: 

return self._password 

 

def set_password(self, value): 

if value: 

try: 

self._password = FERNET.encrypt(bytes(value, 'utf-8')).decode() 

self.is_encrypted = True 

except NameError: 

self._password = value 

self.is_encrypted = False 

 

@declared_attr 

def password(cls): 

return synonym('_password', 

descriptor=property(cls.get_password, cls.set_password)) 

 

def get_extra(self): 

if self._extra and self.is_extra_encrypted: 

if not ENCRYPTION_ON: 

raise AirflowException( 

"Can't decrypt `extra`, configuration is missing") 

return FERNET.decrypt(bytes(self._extra, 'utf-8')).decode() 

else: 

return self._extra 

 

def set_extra(self, value): 

if value: 

try: 

self._extra = FERNET.encrypt(bytes(value, 'utf-8')).decode() 

self.is_extra_encrypted = True 

except NameError: 

self._extra = value 

self.is_extra_encrypted = False 

 

@declared_attr 

def extra(cls): 

return synonym('_extra', 

descriptor=property(cls.get_extra, cls.set_extra)) 

 

def get_hook(self): 

try: 

if self.conn_type == 'mysql': 

from airflow.hooks.mysql_hook import MySqlHook 

return MySqlHook(mysql_conn_id=self.conn_id) 

elif self.conn_type == 'google_cloud_platform': 

from airflow.contrib.hooks.bigquery_hook import BigQueryHook 

return BigQueryHook(bigquery_conn_id=self.conn_id) 

elif self.conn_type == 'postgres': 

from airflow.hooks.postgres_hook import PostgresHook 

return PostgresHook(postgres_conn_id=self.conn_id) 

elif self.conn_type == 'hive_cli': 

from airflow.hooks.hive_hooks import HiveCliHook 

return HiveCliHook(hive_cli_conn_id=self.conn_id) 

elif self.conn_type == 'presto': 

from airflow.hooks.presto_hook import PrestoHook 

return PrestoHook(presto_conn_id=self.conn_id) 

elif self.conn_type == 'hiveserver2': 

from airflow.hooks.hive_hooks import HiveServer2Hook 

return HiveServer2Hook(hiveserver2_conn_id=self.conn_id) 

elif self.conn_type == 'sqlite': 

from airflow.hooks.sqlite_hook import SqliteHook 

return SqliteHook(sqlite_conn_id=self.conn_id) 

elif self.conn_type == 'jdbc': 

from airflow.hooks.jdbc_hook import JdbcHook 

return JdbcHook(jdbc_conn_id=self.conn_id) 

elif self.conn_type == 'mssql': 

from airflow.hooks.mssql_hook import MsSqlHook 

return MsSqlHook(mssql_conn_id=self.conn_id) 

elif self.conn_type == 'oracle': 

from airflow.hooks.oracle_hook import OracleHook 

return OracleHook(oracle_conn_id=self.conn_id) 

elif self.conn_type == 'vertica': 

from airflow.contrib.hooks.vertica_hook import VerticaHook 

return VerticaHook(vertica_conn_id=self.conn_id) 

elif self.conn_type == 'cloudant': 

from airflow.contrib.hooks.cloudant_hook import CloudantHook 

return CloudantHook(cloudant_conn_id=self.conn_id) 

except: 

return None 

 

def __repr__(self): 

return self.conn_id 

 

@property 

def extra_dejson(self): 

"""Returns the extra property by deserializing json""" 

obj = {} 

if self.extra: 

try: 

obj = json.loads(self.extra) 

except Exception as e: 

logging.exception(e) 

logging.error( 

"Failed parsing the json for " 

"conn_id {}".format(self.conn_id)) 

return obj 

 

 

class DagPickle(Base): 

""" 

Dags can originate from different places (user repos, master repo, ...) 

and also get executed in different places (different executors). This 

object represents a version of a DAG and becomes a source of truth for 

a BackfillJob execution. A pickle is a native python serialized object, 

and in this case gets stored in the database for the duration of the job. 

 

The executors pick up the DagPickle id and read the dag definition from 

the database. 

""" 

id = Column(Integer, primary_key=True) 

pickle = Column(PickleType(pickler=dill)) 

created_dttm = Column(DateTime, default=func.now()) 

pickle_hash = Column(Text) 

 

__tablename__ = "dag_pickle" 

 

def __init__(self, dag): 

self.dag_id = dag.dag_id 

if hasattr(dag, 'template_env'): 

dag.template_env = None 

self.pickle_hash = hash(dag) 

self.pickle = dag 

 

 

class TaskInstance(Base): 

""" 

Task instances store the state of a task instance. This table is the 

authority and single source of truth around what tasks have run and the 

state they are in. 

 

The SqlAchemy model doesn't have a SqlAlchemy foreign key to the task or 

dag model deliberately to have more control over transactions. 

 

Database transactions on this table should insure double triggers and 

any confusion around what task instances are or aren't ready to run 

even while multiple schedulers may be firing task instances. 

""" 

 

__tablename__ = "task_instance" 

 

task_id = Column(String(ID_LEN), primary_key=True) 

dag_id = Column(String(ID_LEN), primary_key=True) 

execution_date = Column(DateTime, primary_key=True) 

start_date = Column(DateTime) 

end_date = Column(DateTime) 

duration = Column(Float) 

state = Column(String(20)) 

try_number = Column(Integer, default=0) 

hostname = Column(String(1000)) 

unixname = Column(String(1000)) 

job_id = Column(Integer) 

pool = Column(String(50)) 

queue = Column(String(50)) 

priority_weight = Column(Integer) 

operator = Column(String(1000)) 

queued_dttm = Column(DateTime) 

 

__table_args__ = ( 

Index('ti_dag_state', dag_id, state), 

Index('ti_state_lkp', dag_id, task_id, execution_date, state), 

Index('ti_pool', pool, state, priority_weight), 

) 

 

def __init__(self, task, execution_date, state=None): 

self.dag_id = task.dag_id 

self.task_id = task.task_id 

self.execution_date = execution_date 

self.task = task 

self.queue = task.queue 

self.pool = task.pool 

self.priority_weight = task.priority_weight_total 

self.try_number = 0 

self.test_mode = False # can be changed when calling 'run' 

self.force = False # can be changed when calling 'run' 

self.unixname = getpass.getuser() 

if state: 

self.state = state 

 

def command( 

self, 

mark_success=False, 

ignore_dependencies=False, 

ignore_depends_on_past=False, 

force=False, 

local=False, 

pickle_id=None, 

raw=False, 

job_id=None, 

pool=None): 

""" 

Returns a command that can be executed anywhere where airflow is 

installed. This command is part of the message sent to executors by 

the orchestrator. 

""" 

dag = self.task.dag 

iso = self.execution_date.isoformat() 

cmd = "airflow run {self.dag_id} {self.task_id} {iso} " 

cmd += "--mark_success " if mark_success else "" 

cmd += "--pickle {pickle_id} " if pickle_id else "" 

cmd += "--job_id {job_id} " if job_id else "" 

cmd += "-i " if ignore_dependencies else "" 

cmd += "-I " if ignore_depends_on_past else "" 

cmd += "--force " if force else "" 

cmd += "--local " if local else "" 

cmd += "--pool {pool} " if pool else "" 

cmd += "--raw " if raw else "" 

if not pickle_id and dag: 

if dag.full_filepath != dag.filepath: 

cmd += "-sd DAGS_FOLDER/{dag.filepath} " 

elif dag.full_filepath: 

cmd += "-sd {dag.full_filepath}" 

return cmd.format(**locals()) 

 

@property 

def log_filepath(self): 

iso = self.execution_date.isoformat() 

log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) 

return ( 

"{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals())) 

 

@property 

def log_url(self): 

iso = self.execution_date.isoformat() 

BASE_URL = configuration.get('webserver', 'BASE_URL') 

return BASE_URL + ( 

"/admin/airflow/log" 

"?dag_id={self.dag_id}" 

"&task_id={self.task_id}" 

"&execution_date={iso}" 

).format(**locals()) 

 

@property 

def mark_success_url(self): 

iso = self.execution_date.isoformat() 

BASE_URL = configuration.get('webserver', 'BASE_URL') 

return BASE_URL + ( 

"/admin/airflow/action" 

"?action=success" 

"&task_id={self.task_id}" 

"&dag_id={self.dag_id}" 

"&execution_date={iso}" 

"&upstream=false" 

"&downstream=false" 

).format(**locals()) 

 

@provide_session 

def current_state(self, session=None): 

""" 

Get the very latest state from the database, if a session is passed, 

we use and looking up the state becomes part of the session, otherwise 

a new session is used. 

""" 

TI = TaskInstance 

ti = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.task_id == self.task_id, 

TI.execution_date == self.execution_date, 

).all() 

if ti: 

state = ti[0].state 

else: 

state = None 

return state 

 

@provide_session 

def error(self, session=None): 

""" 

Forces the task instance's state to FAILED in the database. 

""" 

logging.error("Recording the task instance as FAILED") 

self.state = State.FAILED 

session.merge(self) 

session.commit() 

 

@provide_session 

def refresh_from_db(self, session=None, lock_for_update=False): 

""" 

Refreshes the task instance from the database based on the primary key 

 

:param lock_for_update: if True, indicates that the database should 

lock the TaskInstance (issuing a FOR UPDATE clause) until the session 

is committed. 

""" 

TI = TaskInstance 

 

qry = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.task_id == self.task_id, 

TI.execution_date == self.execution_date) 

 

if lock_for_update: 

ti = qry.with_for_update().first() 

else: 

ti = qry.first() 

if ti: 

self.state = ti.state 

self.start_date = ti.start_date 

self.end_date = ti.end_date 

self.try_number = ti.try_number 

else: 

self.state = None 

 

@provide_session 

def clear_xcom_data(self, session=None): 

""" 

Clears all XCom data from the database for the task instance 

""" 

session.query(XCom).filter( 

XCom.dag_id == self.dag_id, 

XCom.task_id == self.task_id, 

XCom.execution_date == self.execution_date 

).delete() 

session.commit() 

 

@property 

def key(self): 

""" 

Returns a tuple that identifies the task instance uniquely 

""" 

return (self.dag_id, self.task_id, self.execution_date) 

 

def set_state(self, state, session): 

self.state = state 

self.start_date = datetime.now() 

self.end_date = datetime.now() 

session.merge(self) 

session.commit() 

 

def is_queueable( 

self, 

include_queued=False, 

ignore_depends_on_past=False, 

flag_upstream_failed=False): 

""" 

Returns a boolean on whether the task instance has met all dependencies 

and is ready to run. It considers the task's state, the state 

of its dependencies, depends_on_past and makes sure the execution 

isn't in the future. It doesn't take into 

account whether the pool has a slot for it to run. 

 

:param include_queued: If True, tasks that have already been queued 

are included. Defaults to False. 

:type include_queued: boolean 

:param ignore_depends_on_past: if True, ignores depends_on_past 

dependencies. Defaults to False. 

:type ignore_depends_on_past: boolean 

:param flag_upstream_failed: This is a hack to generate 

the upstream_failed state creation while checking to see 

whether the task instance is runnable. It was the shortest 

path to add the feature 

:type flag_upstream_failed: boolean 

""" 

# is the execution date in the future? 

if self.execution_date > datetime.now(): 

return False 

# is the task still in the retry waiting period? 

elif self.is_premature(): 

return False 

# does the task have an end_date prior to the execution date? 

elif self.task.end_date and self.execution_date > self.task.end_date: 

return False 

# has the task been skipped? 

elif self.state == State.SKIPPED: 

return False 

# has the task already been queued (and are we excluding queued tasks)? 

elif self.state == State.QUEUED and not include_queued: 

return False 

# is the task runnable and have its dependencies been met? 

elif ( 

self.state in State.runnable() and 

self.are_dependencies_met( 

ignore_depends_on_past=ignore_depends_on_past, 

flag_upstream_failed=flag_upstream_failed)): 

return True 

# anything else 

else: 

return False 

 

def is_premature(self): 

""" 

Returns whether a task is in UP_FOR_RETRY state and its retry interval 

has elapsed. 

""" 

# is the task still in the retry waiting period? 

return self.state == State.UP_FOR_RETRY and not self.ready_for_retry() 

 

def is_runnable( 

self, 

include_queued=False, 

ignore_depends_on_past=False, 

flag_upstream_failed=False): 

""" 

Returns whether a task is ready to run AND there's room in the 

queue. 

 

:param include_queued: If True, tasks that are already QUEUED are 

considered "runnable". Defaults to False. 

:type include_queued: boolean 

:param ignore_depends_on_past: if True, ignores depends_on_past 

dependencies. Defaults to False. 

:type ignore_depends_on_past: boolean 

""" 

queueable = self.is_queueable( 

include_queued=include_queued, 

ignore_depends_on_past=ignore_depends_on_past, 

flag_upstream_failed=flag_upstream_failed) 

return queueable and not self.pool_full() 

 

@provide_session 

def are_dependents_done(self, session=None): 

""" 

Checks whether the dependents of this task instance have all succeeded. 

This is meant to be used by wait_for_downstream. 

 

This is useful when you do not want to start processing the next 

schedule of a task until the dependents are done. For instance, 

if the task DROPs and recreates a table. 

""" 

task = self.task 

 

if not task.downstream_task_ids: 

return True 

 

ti = session.query(func.count(TaskInstance.task_id)).filter( 

TaskInstance.dag_id == self.dag_id, 

TaskInstance.task_id.in_(task.downstream_task_ids), 

TaskInstance.execution_date == self.execution_date, 

TaskInstance.state == State.SUCCESS, 

) 

count = ti[0][0] 

return count == len(task.downstream_task_ids) 

 

@provide_session 

def evaluate_trigger_rule(self, successes, skipped, failed, 

upstream_failed, done, 

flag_upstream_failed, session=None): 

""" 

Returns a boolean on whether the current task can be scheduled 

for execution based on its trigger_rule. 

 

:param flag_upstream_failed: This is a hack to generate 

the upstream_failed state creation while checking to see 

whether the task instance is runnable. It was the shortest 

path to add the feature 

:type flag_upstream_failed: boolean 

:param successes: Number of successful upstream tasks 

:type successes: boolean 

:param skipped: Number of skipped upstream tasks 

:type skipped: boolean 

:param failed: Number of failed upstream tasks 

:type failed: boolean 

:param upstream_failed: Number of upstream_failed upstream tasks 

:type upstream_failed: boolean 

:param done: Number of completed upstream tasks 

:type done: boolean 

""" 

TR = TriggerRule 

 

task = self.task 

upstream = len(task.upstream_task_ids) 

tr = task.trigger_rule 

upstream_done = done >= upstream 

 

# handling instant state assignment based on trigger rules 

if flag_upstream_failed: 

if tr == TR.ALL_SUCCESS: 

if upstream_failed or failed: 

self.set_state(State.UPSTREAM_FAILED, session) 

elif skipped: 

self.set_state(State.SKIPPED, session) 

elif tr == TR.ALL_FAILED: 

if successes or skipped: 

self.set_state(State.SKIPPED, session) 

elif tr == TR.ONE_SUCCESS: 

if upstream_done and not successes: 

self.set_state(State.SKIPPED, session) 

elif tr == TR.ONE_FAILED: 

if upstream_done and not (failed or upstream_failed): 

self.set_state(State.SKIPPED, session) 

 

return ( 

(tr == TR.ONE_SUCCESS and successes > 0) or 

(tr == TR.ONE_FAILED and (failed or upstream_failed)) or 

(tr == TR.ALL_SUCCESS and successes >= upstream) or 

(tr == TR.ALL_FAILED and failed + upstream_failed >= upstream) or 

(tr == TR.ALL_DONE and upstream_done) 

) 

 

@provide_session 

def are_dependencies_met( 

self, 

session=None, 

flag_upstream_failed=False, 

ignore_depends_on_past=False, 

verbose=False): 

""" 

Returns a boolean on whether the upstream tasks are in a SUCCESS state 

and considers depends_on_past and the previous run's state. 

 

:param flag_upstream_failed: This is a hack to generate 

the upstream_failed state creation while checking to see 

whether the task instance is runnable. It was the shortest 

path to add the feature 

:type flag_upstream_failed: boolean 

:param ignore_depends_on_past: if True, ignores depends_on_past 

dependencies. Defaults to False. 

:type ignore_depends_on_past: boolean 

:param verbose: verbose provides more logging in the case where the 

task instance is evaluated as a check right before being executed. 

In the case of the scheduler evaluating the dependencies, this 

logging would be way too verbose. 

:type verbose: boolean 

""" 

TI = TaskInstance 

TR = TriggerRule 

 

task = self.task 

 

# Checking that the depends_on_past is fulfilled 

if (task.depends_on_past and not ignore_depends_on_past and 

not self.execution_date == task.start_date): 

previous_ti = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.task_id == task.task_id, 

TI.execution_date == 

self.task.dag.previous_schedule(self.execution_date), 

TI.state.in_({State.SUCCESS, State.SKIPPED}), 

).first() 

if not previous_ti: 

if verbose: 

logging.warning("depends_on_past not satisfied") 

return False 

 

# Applying wait_for_downstream 

previous_ti.task = self.task 

if task.wait_for_downstream and not \ 

previous_ti.are_dependents_done(session=session): 

if verbose: 

logging.warning("wait_for_downstream not satisfied") 

return False 

 

# Checking that all upstream dependencies have succeeded 

if not task.upstream_list or task.trigger_rule == TR.DUMMY: 

return True 

 

# todo: this query becomes quite expensive with dags that have 

# many tasks. It should be refactored to let the task report 

# to the dag run and get the aggregates from there 

qry = ( 

session 

.query( 

func.coalesce(func.sum( 

case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), 

func.coalesce(func.sum( 

case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), 

func.coalesce(func.sum( 

case([(TI.state == State.FAILED, 1)], else_=0)), 0), 

func.coalesce(func.sum( 

case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), 

func.count(TI.task_id), 

) 

.filter( 

TI.dag_id == self.dag_id, 

TI.task_id.in_(task.upstream_task_ids), 

TI.execution_date == self.execution_date, 

TI.state.in_([ 

State.SUCCESS, State.FAILED, 

State.UPSTREAM_FAILED, State.SKIPPED]), 

) 

) 

 

successes, skipped, failed, upstream_failed, done = qry.first() 

 

satisfied = self.evaluate_trigger_rule( 

session=session, successes=successes, skipped=skipped, 

failed=failed, upstream_failed=upstream_failed, done=done, 

flag_upstream_failed=flag_upstream_failed) 

 

if verbose and not satisfied: 

logging.warning("Trigger rule `{}` not satisfied".format(task.trigger_rule)) 

return satisfied 

 

def __repr__(self): 

return ( 

"<TaskInstance: {ti.dag_id}.{ti.task_id} " 

"{ti.execution_date} [{ti.state}]>" 

).format(ti=self) 

 

def ready_for_retry(self): 

""" 

Checks on whether the task instance is in the right state and timeframe 

to be retried. 

""" 

return self.state == State.UP_FOR_RETRY and \ 

self.end_date + self.task.retry_delay < datetime.now() 

 

@provide_session 

def pool_full(self, session): 

""" 

Returns a boolean as to whether the slot pool has room for this 

task to run 

""" 

if not self.task.pool: 

return False 

 

pool = ( 

session 

.query(Pool) 

.filter(Pool.pool == self.task.pool) 

.first() 

) 

if not pool: 

raise ValueError( 

"Task specified a pool ({}) but the pool " 

"doesn't exist!".format(self.task.pool)) 

open_slots = pool.open_slots(session=session) 

 

return open_slots <= 0 

 

@provide_session 

def run( 

self, 

verbose=True, 

ignore_dependencies=False, 

ignore_depends_on_past=False, 

force=False, 

mark_success=False, 

test_mode=False, 

job_id=None, 

pool=None, 

session=None): 

""" 

Runs the task instance. 

 

:param verbose: whether to turn on more verbose loggin 

:type verbose: boolean 

:param ignore_dependencies: Doesn't check for deps, just runs 

:type ignore_dependencies: boolean 

:param ignore_depends_on_past: Ignore depends_on_past but respect 

other dependencies 

:type ignore_depends_on_past: boolean 

:param force: Forces a run regarless of previous success 

:type force: boolean 

:param mark_success: Don't run the task, mark its state as success 

:type mark_success: boolean 

:param test_mode: Doesn't record success or failure in the DB 

:type test_mode: boolean 

:param pool: specifies the pool to use to run the task instance 

:type pool: str 

""" 

task = self.task 

self.pool = pool or task.pool 

self.test_mode = test_mode 

self.force = force 

self.refresh_from_db(session=session, lock_for_update=True) 

self.job_id = job_id 

iso = datetime.now().isoformat() 

self.hostname = socket.getfqdn() 

self.operator = task.__class__.__name__ 

 

if self.state == State.RUNNING: 

logging.warning("Another instance is running, skipping.") 

elif self.state == State.REMOVED: 

self.clear_xcom_data() 

logging.debug("Task {} was removed from the dag".format(self)) 

elif not force and self.state == State.SUCCESS: 

logging.info( 

"Task {self} previously succeeded" 

" on {self.end_date}".format(**locals()) 

) 

Stats.incr('previously_succeeded', 1, 1) 

elif ( 

not ignore_dependencies and 

not self.are_dependencies_met( 

session=session, 

ignore_depends_on_past=ignore_depends_on_past, 

verbose=True)): 

logging.warning("Dependencies not met yet") 

elif ( 

# todo: move this to the scheduler 

self.state == State.UP_FOR_RETRY and 

not self.ready_for_retry()): 

next_run = (self.end_date + task.retry_delay).isoformat() 

logging.info( 

"Not ready for retry yet. " + 

"Next run after {0}".format(next_run) 

) 

elif force or self.state in State.runnable(): 

self.clear_xcom_data() 

HR = "\n" + ("-" * 80) + "\n" # Line break 

 

# For reporting purposes, we report based on 1-indexed, 

# not 0-indexed lists (i.e. Attempt 1 instead of 

# Attempt 0 for the first attempt). 

msg = "Starting attempt {attempt} of {total}".format( 

attempt=self.try_number % (task.retries + 1) + 1, 

total=task.retries + 1) 

self.start_date = datetime.now() 

 

if not mark_success and self.state != State.QUEUED and ( 

self.pool or self.task.dag.concurrency_reached): 

# If a pool is set for this task, marking the task instance 

# as QUEUED 

self.state = State.QUEUED 

msg = "Queuing attempt {attempt} of {total}".format( 

attempt=self.try_number % (task.retries + 1) + 1, 

total=task.retries + 1) 

logging.info(HR + msg + HR) 

 

self.queued_dttm = datetime.now() 

session.merge(self) 

session.commit() 

logging.info("Queuing into pool {}".format(self.pool)) 

return 

 

# print status message 

logging.info(HR + msg + HR) 

self.try_number += 1 

 

if not test_mode: 

session.add(Log(State.RUNNING, self)) 

self.state = State.RUNNING 

self.end_date = None 

if not test_mode: 

session.merge(self) 

session.commit() 

 

# Closing all pooled connections to prevent 

# "max number of connections reached" 

settings.engine.dispose() 

if verbose: 

if mark_success: 

msg = "Marking success for " 

else: 

msg = "Executing " 

msg += "{self.task} on {self.execution_date}" 

 

context = {} 

try: 

logging.info(msg.format(self=self)) 

if not mark_success: 

context = self.get_template_context() 

 

task_copy = copy.copy(task) 

self.task = task_copy 

 

def signal_handler(signum, frame): 

'''Setting kill signal handler''' 

logging.error("Killing subprocess") 

task_copy.on_kill() 

raise AirflowException("Task received SIGTERM signal") 

signal.signal(signal.SIGTERM, signal_handler) 

 

self.render_templates() 

task_copy.pre_execute(context=context) 

 

# If a timout is specified for the task, make it fail 

# if it goes beyond 

result = None 

if task_copy.execution_timeout: 

with timeout(int( 

task_copy.execution_timeout.total_seconds())): 

result = task_copy.execute(context=context) 

 

else: 

result = task_copy.execute(context=context) 

 

# If the task returns a result, push an XCom containing it 

if result is not None: 

self.xcom_push(key=XCOM_RETURN_KEY, value=result) 

 

task_copy.post_execute(context=context) 

self.state = State.SUCCESS 

except AirflowSkipException: 

self.state = State.SKIPPED 

except (Exception, KeyboardInterrupt) as e: 

self.handle_failure(e, test_mode, context) 

raise 

 

# Recording SUCCESS 

self.end_date = datetime.now() 

self.set_duration() 

if not test_mode: 

session.add(Log(self.state, self)) 

session.merge(self) 

session.commit() 

 

# Success callback 

try: 

if task.on_success_callback: 

task.on_success_callback(context) 

except Exception as e3: 

logging.error("Failed when executing success callback") 

logging.exception(e3) 

 

session.commit() 

 

def dry_run(self): 

task = self.task 

task_copy = copy.copy(task) 

self.task = task_copy 

 

self.render_templates() 

task_copy.dry_run() 

 

def handle_failure(self, error, test_mode=False, context=None): 

logging.exception(error) 

task = self.task 

session = settings.Session() 

self.end_date = datetime.now() 

self.set_duration() 

if not test_mode: 

session.add(Log(State.FAILED, self)) 

 

# Let's go deeper 

try: 

if task.retries and self.try_number % (task.retries + 1) != 0: 

self.state = State.UP_FOR_RETRY 

logging.info('Marking task as UP_FOR_RETRY') 

if task.email_on_retry and task.email: 

self.email_alert(error, is_retry=True) 

else: 

self.state = State.FAILED 

if task.retries: 

logging.info('All retries failed; marking task as FAILED') 

else: 

logging.info('Marking task as FAILED.') 

if task.email_on_failure and task.email: 

self.email_alert(error, is_retry=False) 

except Exception as e2: 

logging.error( 

'Failed to send email to: ' + str(task.email)) 

logging.exception(e2) 

 

# Handling callbacks pessimistically 

try: 

if self.state == State.UP_FOR_RETRY and task.on_retry_callback: 

task.on_retry_callback(context) 

if self.state == State.FAILED and task.on_failure_callback: 

task.on_failure_callback(context) 

except Exception as e3: 

logging.error("Failed at executing callback") 

logging.exception(e3) 

 

if not test_mode: 

session.merge(self) 

session.commit() 

logging.error(str(error)) 

 

@provide_session 

def get_template_context(self, session=None): 

task = self.task 

from airflow import macros 

tables = None 

if 'tables' in task.params: 

tables = task.params['tables'] 

 

ds = self.execution_date.isoformat()[:10] 

ts = self.execution_date.isoformat() 

yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10] 

tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10] 

 

ds_nodash = ds.replace('-', '') 

ts_nodash = ts.replace('-', '').replace(':', '') 

yesterday_ds_nodash = yesterday_ds.replace('-', '') 

tomorrow_ds_nodash = tomorrow_ds.replace('-', '') 

 

ti_key_str = "{task.dag_id}__{task.task_id}__{ds_nodash}" 

ti_key_str = ti_key_str.format(**locals()) 

 

params = {} 

run_id = '' 

dag_run = None 

if hasattr(task, 'dag'): 

if task.dag.params: 

params.update(task.dag.params) 

dag_run = ( 

session.query(DagRun) 

.filter_by( 

dag_id=task.dag.dag_id, 

execution_date=self.execution_date) 

.first() 

) 

run_id = dag_run.run_id if dag_run else None 

session.expunge_all() 

session.commit() 

 

if task.params: 

params.update(task.params) 

 

class VariableAccessor: 

""" 

Wrapper around Variable. This way you can get variables in templates by using 

{var.variable_name}. 

""" 

def __init__(self): 

pass 

 

def __getattr__(self, item): 

return Variable.get(item) 

 

class VariableJsonAccessor: 

def __init__(self): 

pass 

 

def __getattr__(self, item): 

return Variable.get(item, deserialize_json=True) 

 

return { 

'dag': task.dag, 

'ds': ds, 

'ds_nodash': ds_nodash, 

'ts': ts, 

'ts_nodash': ts_nodash, 

'yesterday_ds': yesterday_ds, 

'yesterday_ds_nodash': yesterday_ds_nodash, 

'tomorrow_ds': tomorrow_ds, 

'tomorrow_ds_nodash': tomorrow_ds_nodash, 

'END_DATE': ds, 

'end_date': ds, 

'dag_run': dag_run, 

'run_id': run_id, 

'execution_date': self.execution_date, 

'latest_date': ds, 

'macros': macros, 

'params': params, 

'tables': tables, 

'task': task, 

'task_instance': self, 

'ti': self, 

'task_instance_key_str': ti_key_str, 

'conf': configuration, 

'test_mode': self.test_mode, 

'var': { 

'value': VariableAccessor(), 

'json': VariableJsonAccessor() 

} 

} 

 

def render_templates(self): 

task = self.task 

jinja_context = self.get_template_context() 

if hasattr(self, 'task') and hasattr(self.task, 'dag'): 

if self.task.dag.user_defined_macros: 

jinja_context.update( 

self.task.dag.user_defined_macros) 

 

rt = self.task.render_template # shortcut to method 

for attr in task.__class__.template_fields: 

content = getattr(task, attr) 

if content: 

rendered_content = rt(attr, content, jinja_context) 

setattr(task, attr, rendered_content) 

 

def email_alert(self, exception, is_retry=False): 

task = self.task 

title = "Airflow alert: {self}".format(**locals()) 

exception = str(exception).replace('\n', '<br>') 

try_ = task.retries + 1 

body = ( 

"Try {self.try_number} out of {try_}<br>" 

"Exception:<br>{exception}<br>" 

"Log: <a href='{self.log_url}'>Link</a><br>" 

"Host: {self.hostname}<br>" 

"Log file: {self.log_filepath}<br>" 

"Mark success: <a href='{self.mark_success_url}'>Link</a><br>" 

).format(**locals()) 

send_email(task.email, title, body) 

 

def set_duration(self): 

if self.end_date and self.start_date: 

self.duration = (self.end_date - self.start_date).total_seconds() 

else: 

self.duration = None 

 

def xcom_push( 

self, 

key, 

value, 

execution_date=None): 

""" 

Make an XCom available for tasks to pull. 

 

:param key: A key for the XCom 

:type key: string 

:param value: A value for the XCom. The value is pickled and stored 

in the database. 

:type value: any pickleable object 

:param execution_date: if provided, the XCom will not be visible until 

this date. This can be used, for example, to send a message to a 

task on a future date without it being immediately visible. 

:type execution_date: datetime 

""" 

 

if execution_date and execution_date < self.execution_date: 

raise ValueError( 

'execution_date can not be in the past (current ' 

'execution_date is {}; received {})'.format( 

self.execution_date, execution_date)) 

 

XCom.set( 

key=key, 

value=value, 

task_id=self.task_id, 

dag_id=self.dag_id, 

execution_date=execution_date or self.execution_date) 

 

def xcom_pull( 

self, 

task_ids, 

dag_id=None, 

key=XCOM_RETURN_KEY, 

include_prior_dates=False): 

""" 

Pull XComs that optionally meet certain criteria. 

 

The default value for `key` limits the search to XComs 

that were returned by other tasks (as opposed to those that were pushed 

manually). To remove this filter, pass key=None (or any desired value). 

 

If a single task_id string is provided, the result is the value of the 

most recent matching XCom from that task_id. If multiple task_ids are 

provided, a tuple of matching values is returned. None is returned 

whenever no matches are found. 

 

:param key: A key for the XCom. If provided, only XComs with matching 

keys will be returned. The default key is 'return_value', also 

available as a constant XCOM_RETURN_KEY. This key is automatically 

given to XComs returned by tasks (as opposed to being pushed 

manually). To remove the filter, pass key=None. 

:type key: string 

:param task_ids: Only XComs from tasks with matching ids will be 

pulled. Can pass None to remove the filter. 

:type task_ids: string or iterable of strings (representing task_ids) 

:param dag_id: If provided, only pulls XComs from this DAG. 

If None (default), the DAG of the calling task is used. 

:type dag_id: string 

:param include_prior_dates: If False, only XComs from the current 

execution_date are returned. If True, XComs from previous dates 

are returned as well. 

:type include_prior_dates: bool 

""" 

 

if dag_id is None: 

dag_id = self.dag_id 

 

pull_fn = functools.partial( 

XCom.get_one, 

execution_date=self.execution_date, 

key=key, 

dag_id=dag_id, 

include_prior_dates=include_prior_dates) 

 

if is_container(task_ids): 

return tuple(pull_fn(task_id=t) for t in task_ids) 

else: 

return pull_fn(task_id=task_ids) 

 

 

class Log(Base): 

""" 

Used to actively log events to the database 

""" 

 

__tablename__ = "log" 

 

id = Column(Integer, primary_key=True) 

dttm = Column(DateTime) 

dag_id = Column(String(ID_LEN)) 

task_id = Column(String(ID_LEN)) 

event = Column(String(30)) 

execution_date = Column(DateTime) 

owner = Column(String(500)) 

extra = Column(Text) 

 

def __init__(self, event, task_instance, owner=None, extra=None, **kwargs): 

self.dttm = datetime.now() 

self.event = event 

self.extra = extra 

 

task_owner = None 

 

if task_instance: 

self.dag_id = task_instance.dag_id 

self.task_id = task_instance.task_id 

self.execution_date = task_instance.execution_date 

task_owner = task_instance.task.owner 

 

if 'task_id' in kwargs: 

self.task_id = kwargs['task_id'] 

if 'dag_id' in kwargs: 

self.dag_id = kwargs['dag_id'] 

if 'execution_date' in kwargs: 

if kwargs['execution_date']: 

self.execution_date = kwargs['execution_date'] 

 

self.owner = owner or task_owner 

 

 

@functools.total_ordering 

class BaseOperator(object): 

""" 

Abstract base class for all operators. Since operators create objects that 

become node in the dag, BaseOperator contains many recursive methods for 

dag crawling behavior. To derive this class, you are expected to override 

the constructor as well as the 'execute' method. 

 

Operators derived from this task should perform or trigger certain tasks 

synchronously (wait for completion). Example of operators could be an 

operator the runs a Pig job (PigOperator), a sensor operator that 

waits for a partition to land in Hive (HiveSensorOperator), or one that 

moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these 

operators (tasks) target specific operations, running specific scripts, 

functions or data transfers. 

 

This class is abstract and shouldn't be instantiated. Instantiating a 

class derived from this one results in the creation of a task object, 

which ultimately becomes a node in DAG objects. Task dependencies should 

be set by using the set_upstream and/or set_downstream methods. 

 

Note that this class is derived from SQLAlchemy's Base class, which 

allows us to push metadata regarding tasks to the database. Deriving this 

classes needs to implement the polymorphic specificities documented in 

SQLAlchemy. This should become clear while reading the code for other 

operators. 

 

:param task_id: a unique, meaningful id for the task 

:type task_id: string 

:param owner: the owner of the task, using the unix username is recommended 

:type owner: string 

:param retries: the number of retries that should be performed before 

failing the task 

:type retries: int 

:param retry_delay: delay between retries 

:type retry_delay: timedelta 

:param start_date: The ``start_date`` for the task, determines 

the ``execution_date`` for the first task instance. The best practice 

is to have the start_date rounded 

to your DAG's ``schedule_interval``. Daily jobs have their start_date 

some day at 00:00:00, hourly jobs have their start_date at 00:00 

of a specific hour. Note that Airflow simply looks at the latest 

``execution_date`` and adds the ``schedule_interval`` to determine 

the next ``execution_date``. It is also very important 

to note that different tasks' dependencies 

need to line up in time. If task A depends on task B and their 

start_date are offset in a way that their execution_date don't line 

up, A's dependencies will never be met. If you are looking to delay 

a task, for example running a daily task at 2AM, look into the 

``TimeSensor`` and ``TimeDeltaSensor``. We advise against using 

dynamic ``start_date`` and recommend using fixed ones. Read the 

FAQ entry about start_date for more information. 

:type start_date: datetime 

:param end_date: if specified, the scheduler won't go beyond this date 

:type end_date: datetime 

:param depends_on_past: when set to true, task instances will run 

sequentially while relying on the previous task's schedule to 

succeed. The task instance for the start_date is allowed to run. 

:type depends_on_past: bool 

:param wait_for_downstream: when set to true, an instance of task 

X will wait for tasks immediately downstream of the previous instance 

of task X to finish successfully before it runs. This is useful if the 

different instances of a task X alter the same asset, and this asset 

is used by tasks downstream of task X. Note that depends_on_past 

is forced to True wherever wait_for_downstream is used. 

:type wait_for_downstream: bool 

:param queue: which queue to target when running this job. Not 

all executors implement queue management, the CeleryExecutor 

does support targeting specific queues. 

:type queue: str 

:param dag: a reference to the dag the task is attached to (if any) 

:type dag: DAG 

:param priority_weight: priority weight of this task against other task. 

This allows the executor to trigger higher priority tasks before 

others when things get backed up. 

:type priority_weight: int 

:param pool: the slot pool this task should run in, slot pools are a 

way to limit concurrency for certain tasks 

:type pool: str 

:param sla: time by which the job is expected to succeed. Note that 

this represents the ``timedelta`` after the period is closed. For 

example if you set an SLA of 1 hour, the scheduler would send dan email 

soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance 

has not succeede yet. 

The scheduler pays special attention for jobs with an SLA and 

sends alert 

emails for sla misses. SLA misses are also recorded in the database 

for future reference. All tasks that share the same SLA time 

get bundled in a single email, sent soon after that time. SLA 

notification are sent once and only once for each task instance. 

:type sla: datetime.timedelta 

:param execution_timeout: max time allowed for the execution of 

this task instance, if it goes beyond it will raise and fail. 

:type execution_timeout: datetime.timedelta 

:param on_failure_callback: a function to be called when a task instance 

of this task fails. a context dictionary is passed as a single 

parameter to this function. Context contains references to related 

objects to the task instance and is documented under the macros 

section of the API. 

:type on_failure_callback: callable 

:param on_retry_callback: much like the ``on_failure_callback`` excepts 

that it is executed when retries occur. 

:param on_success_callback: much like the ``on_failure_callback`` excepts 

that it is executed when the task succeeds. 

:type on_success_callback: callable 

:param trigger_rule: defines the rule by which dependencies are applied 

for the task to get triggered. Options are: 

``{ all_success | all_failed | all_done | one_success | 

one_failed | dummy}`` 

default is ``all_success``. Options can be set as string or 

using the constants defined in the static class 

``airflow.utils.TriggerRule`` 

:type trigger_rule: str 

""" 

 

# For derived classes to define which fields will get jinjaified 

template_fields = [] 

# Defines wich files extensions to look for in the templated fields 

template_ext = [] 

# Defines the color in the UI 

ui_color = '#fff' 

ui_fgcolor = '#000' 

 

@apply_defaults 

def __init__( 

self, 

task_id, 

owner=configuration.get('operators', 'DEFAULT_OWNER'), 

email=None, 

email_on_retry=True, 

email_on_failure=True, 

retries=0, 

retry_delay=timedelta(seconds=300), 

start_date=None, 

end_date=None, 

schedule_interval=None, # not hooked as of now 

depends_on_past=False, 

wait_for_downstream=False, 

dag=None, 

params=None, 

default_args=None, 

adhoc=False, 

priority_weight=1, 

queue=configuration.get('celery', 'default_queue'), 

pool=None, 

sla=None, 

execution_timeout=None, 

on_failure_callback=None, 

on_success_callback=None, 

on_retry_callback=None, 

trigger_rule=TriggerRule.ALL_SUCCESS, 

*args, 

**kwargs): 

 

if args or kwargs: 

# TODO remove *args and **kwargs in Airflow 2.0 

warnings.warn( 

'Invalid arguments were passed to {c}. Support for ' 

'passing such arguments will be dropped in Airflow 2.0. ' 

'Invalid arguments were:' 

'\n*args: {a}\n**kwargs: {k}'.format( 

c=self.__class__.__name__, a=args, k=kwargs), 

category=PendingDeprecationWarning 

) 

 

validate_key(task_id) 

self.task_id = task_id 

self.owner = owner 

self.email = email 

self.email_on_retry = email_on_retry 

self.email_on_failure = email_on_failure 

self.start_date = start_date 

if start_date and not isinstance(start_date, datetime): 

logging.warning( 

"start_date for {} isn't datetime.datetime".format(self)) 

self.end_date = end_date 

if not TriggerRule.is_valid(trigger_rule): 

raise AirflowException( 

"The trigger_rule must be one of {all_triggers}," 

"'{d}.{t}'; received '{tr}'." 

.format(all_triggers=TriggerRule.all_triggers, 

d=dag.dag_id, t=task_id, tr = trigger_rule)) 

 

self.trigger_rule = trigger_rule 

self.depends_on_past = depends_on_past 

self.wait_for_downstream = wait_for_downstream 

if wait_for_downstream: 

self.depends_on_past = True 

 

if schedule_interval: 

logging.warning( 

"schedule_interval is used for {}, though it has " 

"been deprecated as a task parameter, you need to " 

"specify it as a DAG parameter instead".format(self)) 

self._schedule_interval = schedule_interval 

self.retries = retries 

self.queue = queue 

self.pool = pool 

self.sla = sla 

self.execution_timeout = execution_timeout 

self.on_failure_callback = on_failure_callback 

self.on_success_callback = on_success_callback 

self.on_retry_callback = on_retry_callback 

if isinstance(retry_delay, timedelta): 

self.retry_delay = retry_delay 

else: 

logging.debug("retry_delay isn't timedelta object, assuming secs") 

self.retry_delay = timedelta(seconds=retry_delay) 

self.params = params or {} # Available in templates! 

self.adhoc = adhoc 

self.priority_weight = priority_weight 

 

# Private attributes 

self._upstream_task_ids = [] 

self._downstream_task_ids = [] 

 

if not dag and _CONTEXT_MANAGER_DAG: 

dag = _CONTEXT_MANAGER_DAG 

if dag: 

self.dag = dag 

 

self._comps = { 

'task_id', 

'dag_id', 

'owner', 

'email', 

'email_on_retry', 

'retry_delay', 

'start_date', 

'schedule_interval', 

'depends_on_past', 

'wait_for_downstream', 

'adhoc', 

'priority_weight', 

'sla', 

'execution_timeout', 

'on_failure_callback', 

'on_success_callback', 

'on_retry_callback', 

} 

 

def __eq__(self, other): 

return ( 

type(self) == type(other) and 

all(self.__dict__.get(c, None) == other.__dict__.get(c, None) 

for c in self._comps)) 

 

def __neq__(self, other): 

return not self == other 

 

def __lt__(self, other): 

return self.task_id < other.task_id 

 

def __hash__(self): 

hash_components = [type(self)] 

for c in self._comps: 

val = getattr(self, c, None) 

try: 

hash(val) 

hash_components.append(val) 

except TypeError: 

hash_components.append(repr(val)) 

return hash(tuple(hash_components)) 

 

# Composing Operators ----------------------------------------------- 

 

def __rshift__(self, other): 

""" 

Implements Self >> Other == self.set_downstream(other) 

 

If "Other" is a DAG, the DAG is assigned to the Operator. 

""" 

if isinstance(other, DAG): 

# if this dag is already assigned, do nothing 

# otherwise, do normal dag assignment 

if not (self.has_dag() and self.dag is other): 

self.dag = other 

else: 

self.set_downstream(other) 

return other 

 

def __lshift__(self, other): 

""" 

Implements Self << Other == self.set_upstream(other) 

 

If "Other" is a DAG, the DAG is assigned to the Operator. 

""" 

if isinstance(other, DAG): 

# if this dag is already assigned, do nothing 

# otherwise, do normal dag assignment 

if not (self.has_dag() and self.dag is other): 

self.dag = other 

else: 

self.set_upstream(other) 

return other 

 

def __rrshift__(self, other): 

""" 

Called for [DAG] >> [Operator] because DAGs don't have 

__rshift__ operators. 

""" 

self.__lshift__(other) 

return self 

 

def __rlshift__(self, other): 

""" 

Called for [DAG] << [Operator] because DAGs don't have 

__lshift__ operators. 

""" 

self.__rshift__(other) 

return self 

 

# /Composing Operators --------------------------------------------- 

 

@property 

def dag(self): 

""" 

Returns the Operator's DAG if set, otherwise raises an error 

""" 

if self.has_dag(): 

return self._dag 

else: 

raise AirflowException( 

'Operator {} has not been assigned to a DAG yet'.format(self)) 

 

@dag.setter 

def dag(self, dag): 

""" 

Operators can be assigned to one DAG, one time. Repeat assignments to 

that same DAG are ok. 

""" 

if not isinstance(dag, DAG): 

raise TypeError( 

'Expected DAG; received {}'.format(dag.__class__.__name__)) 

elif self.has_dag() and self.dag is not dag: 

raise AirflowException( 

"The DAG assigned to {} can not be changed.".format(self)) 

elif self.task_id not in dag.task_dict: 

dag.add_task(self) 

 

self._dag = dag 

 

def has_dag(self): 

""" 

Returns True if the Operator has been assigned to a DAG. 

""" 

return getattr(self, '_dag', None) is not None 

 

@property 

def dag_id(self): 

if self.has_dag(): 

return self.dag.dag_id 

else: 

return 'adhoc_' + self.owner 

 

@property 

def schedule_interval(self): 

""" 

The schedule interval of the DAG always wins over individual tasks so 

that tasks within a DAG always line up. The task still needs a 

schedule_interval as it may not be attached to a DAG. 

""" 

if self.has_dag(): 

return self.dag._schedule_interval 

else: 

return self._schedule_interval 

 

@property 

def priority_weight_total(self): 

return sum([ 

t.priority_weight 

for t in self.get_flat_relatives(upstream=False) 

]) + self.priority_weight 

 

def pre_execute(self, context): 

""" 

This is triggered right before self.execute, it's mostly a hook 

for people deriving operators. 

""" 

pass 

 

def execute(self, context): 

""" 

This is the main method to derive when creating an operator. 

Context is the same dictionary used as when rendering jinja templates. 

 

Refer to get_template_context for more context. 

""" 

raise NotImplementedError() 

 

def post_execute(self, context): 

""" 

This is triggered right after self.execute, it's mostly a hook 

for people deriving operators. 

""" 

pass 

 

def on_kill(self): 

''' 

Override this method to cleanup subprocesses when a task instance 

gets killed. Any use of the threading, subprocess or multiprocessing 

module within an operator needs to be cleaned up or it will leave 

ghost processes behind. 

''' 

pass 

 

def __deepcopy__(self, memo): 

""" 

Hack sorting double chained task lists by task_id to avoid hitting 

max_depth on deepcopy operations. 

""" 

sys.setrecursionlimit(5000) # TODO fix this in a better way 

cls = self.__class__ 

result = cls.__new__(cls) 

memo[id(self)] = result 

 

for k, v in list(self.__dict__.items()): 

if k not in ('user_defined_macros', 'params'): 

setattr(result, k, copy.deepcopy(v, memo)) 

result.params = self.params 

if hasattr(self, 'user_defined_macros'): 

result.user_defined_macros = self.user_defined_macros 

return result 

 

def render_template_from_field(self, attr, content, context, jinja_env): 

''' 

Renders a template from a field. If the field is a string, it will 

simply render the string and return the result. If it is a collection or 

nested set of collections, it will traverse the structure and render 

all strings in it. 

''' 

rt = self.render_template 

if isinstance(content, six.string_types): 

result = jinja_env.from_string(content).render(**context) 

elif isinstance(content, (list, tuple)): 

result = [rt(attr, e, context) for e in content] 

elif isinstance(content, dict): 

result = { 

k: rt("{}[{}]".format(attr, k), v, context) 

for k, v in list(content.items())} 

else: 

param_type = type(content) 

msg = ( 

"Type '{param_type}' used for parameter '{attr}' is " 

"not supported for templating").format(**locals()) 

raise AirflowException(msg) 

return result 

 

def render_template(self, attr, content, context): 

''' 

Renders a template either from a file or directly in a field, and returns 

the rendered result. 

''' 

jinja_env = self.dag.get_template_env() \ 

if hasattr(self, 'dag') \ 

else jinja2.Environment(cache_size=0) 

 

exts = self.__class__.template_ext 

if ( 

isinstance(content, six.string_types) and 

any([content.endswith(ext) for ext in exts])): 

return jinja_env.get_template(content).render(**context) 

else: 

return self.render_template_from_field(attr, content, context, jinja_env) 

 

def prepare_template(self): 

''' 

Hook that is triggered after the templated fields get replaced 

by their content. If you need your operator to alter the 

content of the file before the template is rendered, 

it should override this method to do so. 

''' 

pass 

 

def resolve_template_files(self): 

# Getting the content of files for template_field / template_ext 

for attr in self.template_fields: 

content = getattr(self, attr) 

if (content and isinstance(content, six.string_types) and 

any([content.endswith(ext) for ext in self.template_ext])): 

env = self.dag.get_template_env() 

try: 

setattr(self, attr, env.loader.get_source(env, content)[0]) 

except Exception as e: 

logging.exception(e) 

self.prepare_template() 

 

@property 

def upstream_list(self): 

"""@property: list of tasks directly upstream""" 

return [self.dag.get_task(tid) for tid in self._upstream_task_ids] 

 

@property 

def upstream_task_ids(self): 

return self._upstream_task_ids 

 

@property 

def downstream_list(self): 

"""@property: list of tasks directly downstream""" 

return [self.dag.get_task(tid) for tid in self._downstream_task_ids] 

 

@property 

def downstream_task_ids(self): 

return self._downstream_task_ids 

 

def clear( 

self, start_date=None, end_date=None, 

upstream=False, downstream=False): 

""" 

Clears the state of task instances associated with the task, following 

the parameters specified. 

""" 

session = settings.Session() 

 

TI = TaskInstance 

qry = session.query(TI).filter(TI.dag_id == self.dag_id) 

 

if start_date: 

qry = qry.filter(TI.execution_date >= start_date) 

if end_date: 

qry = qry.filter(TI.execution_date <= end_date) 

 

tasks = [self.task_id] 

 

if upstream: 

tasks += [ 

t.task_id for t in self.get_flat_relatives(upstream=True)] 

 

if downstream: 

tasks += [ 

t.task_id for t in self.get_flat_relatives(upstream=False)] 

 

qry = qry.filter(TI.task_id.in_(tasks)) 

 

count = qry.count() 

clear_task_instances(qry, session) 

 

session.commit() 

session.close() 

return count 

 

def get_task_instances(self, session, start_date=None, end_date=None): 

""" 

Get a set of task instance related to this task for a specific date 

range. 

""" 

TI = TaskInstance 

end_date = end_date or datetime.now() 

return session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.task_id == self.task_id, 

TI.execution_date >= start_date, 

TI.execution_date <= end_date, 

).order_by(TI.execution_date).all() 

 

def get_flat_relatives(self, upstream=False, l=None): 

""" 

Get a flat list of relatives, either upstream or downstream. 

""" 

if not l: 

l = [] 

for t in self.get_direct_relatives(upstream): 

if not is_in(t, l): 

l.append(t) 

t.get_flat_relatives(upstream, l) 

return l 

 

def detect_downstream_cycle(self, task=None): 

""" 

When invoked, this routine will raise an exception if a cycle is 

detected downstream from self. It is invoked when tasks are added to 

the DAG to detect cycles. 

""" 

if not task: 

task = self 

for t in self.get_direct_relatives(): 

if task is t: 

msg = "Cycle detected in DAG. Faulty task: {0}".format(task) 

raise AirflowException(msg) 

else: 

t.detect_downstream_cycle(task=task) 

return False 

 

def run( 

self, 

start_date=None, 

end_date=None, 

ignore_dependencies=False, 

ignore_first_depends_on_past=False, 

force=False, 

mark_success=False): 

""" 

Run a set of task instances for a date range. 

""" 

start_date = start_date or self.start_date 

end_date = end_date or self.end_date or datetime.now() 

 

for dt in self.dag.date_range(start_date, end_date=end_date): 

TaskInstance(self, dt).run( 

mark_success=mark_success, 

ignore_dependencies=ignore_dependencies, 

ignore_depends_on_past=( 

dt == start_date and ignore_first_depends_on_past), 

force=force,) 

 

def dry_run(self): 

logging.info('Dry run') 

for attr in self.template_fields: 

content = getattr(self, attr) 

if content and isinstance(content, six.string_types): 

logging.info('Rendering template for {0}'.format(attr)) 

logging.info(content) 

 

def get_direct_relatives(self, upstream=False): 

""" 

Get the direct relatives to the current task, upstream or 

downstream. 

""" 

if upstream: 

return self.upstream_list 

else: 

return self.downstream_list 

 

def __repr__(self): 

return "<Task({self.__class__.__name__}): {self.task_id}>".format( 

self=self) 

 

@property 

def task_type(self): 

return self.__class__.__name__ 

 

def append_only_new(self, l, item): 

if any([item is t for t in l]): 

raise AirflowException( 

'Dependency {self}, {item} already registered' 

''.format(**locals())) 

else: 

l.append(item) 

 

def _set_relatives(self, task_or_task_list, upstream=False): 

try: 

task_list = list(task_or_task_list) 

except TypeError: 

task_list = [task_or_task_list] 

 

for t in task_list: 

if not isinstance(t, BaseOperator): 

raise AirflowException( 

"Relationships can only be set between " 

"Operators; received {}".format(t.__class__.__name__)) 

 

# relationships can only be set if the tasks share a single DAG. Tasks 

# without a DAG are assigned to that DAG. 

dags = set(t.dag for t in [self] + task_list if t.has_dag()) 

 

if len(dags) > 1: 

raise AirflowException( 

'Tried to set relationships between tasks in ' 

'more than one DAG: {}'.format(dags)) 

elif len(dags) == 1: 

dag = list(dags)[0] 

else: 

raise AirflowException( 

"Tried to create relationships between tasks that don't have " 

"DAGs yet. Set the DAG for at least one " 

"task and try again: {}".format([self] + task_list)) 

 

if dag and not self.has_dag(): 

self.dag = dag 

 

for task in task_list: 

if dag and not task.has_dag(): 

task.dag = dag 

if upstream: 

task.append_only_new(task._downstream_task_ids, self.task_id) 

self.append_only_new(self._upstream_task_ids, task.task_id) 

else: 

self.append_only_new(self._downstream_task_ids, task.task_id) 

task.append_only_new(task._upstream_task_ids, self.task_id) 

 

self.detect_downstream_cycle() 

 

def set_downstream(self, task_or_task_list): 

""" 

Set a task, or a task task to be directly downstream from the current 

task. 

""" 

self._set_relatives(task_or_task_list, upstream=False) 

 

def set_upstream(self, task_or_task_list): 

""" 

Set a task, or a task task to be directly upstream from the current 

task. 

""" 

self._set_relatives(task_or_task_list, upstream=True) 

 

def xcom_push( 

self, 

context, 

key, 

value, 

execution_date=None): 

""" 

See TaskInstance.xcom_push() 

""" 

context['ti'].xcom_push( 

key=key, 

value=value, 

execution_date=execution_date) 

 

def xcom_pull( 

self, 

context, 

task_ids, 

dag_id=None, 

key=XCOM_RETURN_KEY, 

include_prior_dates=None): 

""" 

See TaskInstance.xcom_pull() 

""" 

return context['ti'].xcom_pull( 

key=key, 

task_ids=task_ids, 

dag_id=dag_id, 

include_prior_dates=include_prior_dates) 

 

 

class DagModel(Base): 

 

__tablename__ = "dag" 

""" 

These items are stored in the database for state related information 

""" 

dag_id = Column(String(ID_LEN), primary_key=True) 

# A DAG can be paused from the UI / DB 

# Set this default value of is_paused based on a configuration value! 

is_paused_at_creation = configuration.getboolean('core', 'dags_are_paused_at_creation') 

is_paused = Column(Boolean, default=is_paused_at_creation) 

# Whether the DAG is a subdag 

is_subdag = Column(Boolean, default=False) 

# Whether that DAG was seen on the last DagBag load 

is_active = Column(Boolean, default=False) 

# Last time the scheduler started 

last_scheduler_run = Column(DateTime) 

# Last time this DAG was pickled 

last_pickled = Column(DateTime) 

# When the DAG received a refreshed signal last, used to know when 

# we need to force refresh 

last_expired = Column(DateTime) 

# Whether (one of) the scheduler is scheduling this DAG at the moment 

scheduler_lock = Column(Boolean) 

# Foreign key to the latest pickle_id 

pickle_id = Column(Integer) 

# The location of the file containing the DAG object 

fileloc = Column(String(2000)) 

# String representing the owners 

owners = Column(String(2000)) 

 

def __repr__(self): 

return "<DAG: {self.dag_id}>".format(self=self) 

 

@classmethod 

def get_current(cls, dag_id): 

session = settings.Session() 

obj = session.query(cls).filter(cls.dag_id == dag_id).first() 

session.expunge_all() 

session.commit() 

session.close() 

return obj 

 

 

@functools.total_ordering 

class DAG(LoggingMixin): 

""" 

A dag (directed acyclic graph) is a collection of tasks with directional 

dependencies. A dag also has a schedule, a start end an end date 

(optional). For each schedule, (say daily or hourly), the DAG needs to run 

each individual tasks as their dependencies are met. Certain tasks have 

the property of depending on their own past, meaning that they can't run 

until their previous schedule (and upstream tasks) are completed. 

 

DAGs essentially act as namespaces for tasks. A task_id can only be 

added once to a DAG. 

 

:param dag_id: The id of the DAG 

:type dag_id: string 

:param schedule_interval: Defines how often that DAG runs, this 

timedelta object gets added to your latest task instance's 

execution_date to figure out the next schedule 

:type schedule_interval: datetime.timedelta or 

dateutil.relativedelta.relativedelta or str that acts as a cron 

expression 

:param start_date: The timestamp from which the scheduler will 

attempt to backfill 

:type start_date: datetime.datetime 

:param end_date: A date beyond which your DAG won't run, leave to None 

for open ended scheduling 

:type end_date: datetime.datetime 

:param template_searchpath: This list of folders (non relative) 

defines where jinja will look for your templates. Order matters. 

Note that jinja/airflow includes the path of your DAG file by 

default 

:type template_searchpath: string or list of stings 

:param user_defined_macros: a dictionary of macros that will be exposed 

in your jinja templates. For example, passing ``dict(foo='bar')`` 

to this argument allows you to ``{{ foo }}`` in all jinja 

templates related to this DAG. Note that you can pass any 

type of object here. 

:type user_defined_macros: dict 

:param default_args: A dictionary of default parameters to be used 

as constructor keyword parameters when initialising operators. 

Note that operators have the same hook, and precede those defined 

here, meaning that if your dict contains `'depends_on_past': True` 

here and `'depends_on_past': False` in the operator's call 

`default_args`, the actual value will be `False`. 

:type default_args: dict 

:param params: a dictionary of DAG level parameters that are made 

accessible in templates, namespaced under `params`. These 

params can be overridden at the task level. 

:type params: dict 

:param concurrency: the number of task instances allowed to run 

concurrently 

:type concurrency: int 

:param max_active_runs: maximum number of active DAG runs, beyond this 

number of DAG runs in a running state, the scheduler won't create 

new active DAG runs 

:type max_active_runs: int 

:param dagrun_timeout: specify how long a DagRun should be up before 

timing out / failing, so that new DagRuns can be created 

:type dagrun_timeout: datetime.timedelta 

:param sla_miss_callback: specify a function to call when reporting SLA 

timeouts. 

:type sla_miss_callback: types.FunctionType 

""" 

 

def __init__( 

self, dag_id, 

schedule_interval=timedelta(days=1), 

start_date=None, end_date=None, 

full_filepath=None, 

template_searchpath=None, 

user_defined_macros=None, 

default_args=None, 

concurrency=configuration.getint('core', 'dag_concurrency'), 

max_active_runs=configuration.getint( 

'core', 'max_active_runs_per_dag'), 

dagrun_timeout=None, 

sla_miss_callback=None, 

params=None): 

 

self.user_defined_macros = user_defined_macros 

self.default_args = default_args or {} 

self.params = params or {} 

 

# merging potentially conflicting default_args['params'] into params 

if 'params' in self.default_args: 

self.params.update(self.default_args['params']) 

del self.default_args['params'] 

 

validate_key(dag_id) 

self.task_dict = dict() 

self.dag_id = dag_id 

self.start_date = start_date 

self.end_date = end_date 

self.schedule_interval = schedule_interval 

if schedule_interval in cron_presets: 

self._schedule_interval = cron_presets.get(schedule_interval) 

elif schedule_interval == '@once': 

self._schedule_interval = None 

else: 

self._schedule_interval = schedule_interval 

self.full_filepath = full_filepath if full_filepath else '' 

if isinstance(template_searchpath, six.string_types): 

template_searchpath = [template_searchpath] 

self.template_searchpath = template_searchpath 

self.parent_dag = None # Gets set when DAGs are loaded 

self.last_loaded = datetime.now() 

self.safe_dag_id = dag_id.replace('.', '__dot__') 

self.concurrency = concurrency 

self.max_active_runs = max_active_runs 

self.dagrun_timeout = dagrun_timeout 

self.sla_miss_callback = sla_miss_callback 

 

self._comps = { 

'dag_id', 

'task_ids', 

'parent_dag', 

'start_date', 

'schedule_interval', 

'full_filepath', 

'template_searchpath', 

'last_loaded', 

} 

 

def __repr__(self): 

return "<DAG: {self.dag_id}>".format(self=self) 

 

def __eq__(self, other): 

return ( 

type(self) == type(other) and 

all(self.__dict__.get(c, None) == other.__dict__.get(c, None) 

for c in self._comps)) 

 

def __neq__(self, other): 

return not self == other 

 

def __lt__(self, other): 

return self.dag_id < other.dag_id 

 

def __hash__(self): 

hash_components = [type(self)] 

for c in self._comps: 

# task_ids returns a list and lists can't be hashed 

if c == 'task_ids': 

val = tuple(self.task_dict.keys()) 

else: 

val = getattr(self, c, None) 

try: 

hash(val) 

hash_components.append(val) 

except TypeError: 

hash_components.append(repr(val)) 

return hash(tuple(hash_components)) 

 

# Context Manager ----------------------------------------------- 

 

def __enter__(self): 

global _CONTEXT_MANAGER_DAG 

self._old_context_manager_dag = _CONTEXT_MANAGER_DAG 

_CONTEXT_MANAGER_DAG = self 

return self 

 

def __exit__(self, _type, _value, _tb): 

global _CONTEXT_MANAGER_DAG 

_CONTEXT_MANAGER_DAG = self._old_context_manager_dag 

 

# /Context Manager ---------------------------------------------- 

 

def date_range(self, start_date, num=None, end_date=datetime.now()): 

if num: 

end_date = None 

return utils_date_range( 

start_date=start_date, end_date=end_date, 

num=num, delta=self._schedule_interval) 

 

def following_schedule(self, dttm): 

if isinstance(self._schedule_interval, six.string_types): 

cron = croniter(self._schedule_interval, dttm) 

return cron.get_next(datetime) 

elif isinstance(self._schedule_interval, timedelta): 

return dttm + self._schedule_interval 

 

def previous_schedule(self, dttm): 

if isinstance(self._schedule_interval, six.string_types): 

cron = croniter(self._schedule_interval, dttm) 

return cron.get_prev(datetime) 

elif isinstance(self._schedule_interval, timedelta): 

return dttm - self._schedule_interval 

 

def normalize_schedule(self, dttm): 

""" 

Returns dttm + interval unless dttm is first interval then it returns dttm 

""" 

following = self.following_schedule(dttm) 

 

# in case of @once 

if not following: 

return dttm 

 

if self.previous_schedule(following) != dttm: 

return following 

 

return dttm 

 

@property 

def tasks(self): 

return list(self.task_dict.values()) 

 

@tasks.setter 

def tasks(self, val): 

raise AttributeError( 

'DAG.tasks can not be modified. Use dag.add_task() instead.') 

 

@property 

def task_ids(self): 

return list(self.task_dict.keys()) 

 

@property 

def active_task_ids(self): 

return list(k for k, v in self.task_dict.items() if not v.adhoc) 

 

@property 

def active_tasks(self): 

return [t for t in self.tasks if not t.adhoc] 

 

@property 

def filepath(self): 

""" 

File location of where the dag object is instantiated 

""" 

fn = self.full_filepath.replace(DAGS_FOLDER + '/', '') 

fn = fn.replace(os.path.dirname(__file__) + '/', '') 

return fn 

 

@property 

def folder(self): 

""" 

Folder location of where the dag object is instantiated 

""" 

return os.path.dirname(self.full_filepath) 

 

@property 

def owner(self): 

return ", ".join(list(set([t.owner for t in self.tasks]))) 

 

@property 

@provide_session 

def concurrency_reached(self, session=None): 

""" 

Returns a boolean indicating whether the concurrency limit for this DAG 

has been reached 

""" 

TI = TaskInstance 

qry = session.query(func.count(TI.task_id)).filter( 

TI.dag_id == self.dag_id, 

TI.task_id.in_(self.task_ids), 

TI.state == State.RUNNING, 

) 

return qry.scalar() >= self.concurrency 

 

@property 

@provide_session 

def is_paused(self, session=None): 

""" 

Returns a boolean indicating whether this DAG is paused 

""" 

qry = session.query(DagModel).filter( 

DagModel.dag_id == self.dag_id) 

return qry.value('is_paused') 

 

@property 

def latest_execution_date(self): 

""" 

Returns the latest date for which at least one task instance exists 

""" 

TI = TaskInstance 

session = settings.Session() 

execution_date = session.query(func.max(TI.execution_date)).filter( 

TI.dag_id == self.dag_id, 

TI.task_id.in_(self.task_ids) 

).scalar() 

session.commit() 

session.close() 

return execution_date 

 

@property 

def subdags(self): 

""" 

Returns a list of the subdag objects associated to this DAG 

""" 

# Check SubDag for class but don't check class directly, see 

# https://github.com/airbnb/airflow/issues/1168 

l = [] 

for task in self.tasks: 

if ( 

task.__class__.__name__ == 'SubDagOperator' and 

hasattr(task, 'subdag')): 

l.append(task.subdag) 

l += task.subdag.subdags 

return l 

 

def resolve_template_files(self): 

for t in self.tasks: 

t.resolve_template_files() 

 

def crawl_for_tasks(objects): 

""" 

Typically called at the end of a script by passing globals() as a 

parameter. This allows to not explicitly add every single task to the 

dag explicitly. 

""" 

raise NotImplementedError("") 

 

def get_template_env(self): 

''' 

Returns a jinja2 Environment while taking into account the DAGs 

template_searchpath and user_defined_macros 

''' 

searchpath = [self.folder] 

if self.template_searchpath: 

searchpath += self.template_searchpath 

 

env = jinja2.Environment( 

loader=jinja2.FileSystemLoader(searchpath), 

extensions=["jinja2.ext.do"], 

cache_size=0) 

if self.user_defined_macros: 

env.globals.update(self.user_defined_macros) 

 

return env 

 

def set_dependency(self, upstream_task_id, downstream_task_id): 

""" 

Simple utility method to set dependency between two tasks that 

already have been added to the DAG using add_task() 

""" 

self.get_task(upstream_task_id).set_downstream( 

self.get_task(downstream_task_id)) 

 

def get_task_instances( 

self, session, start_date=None, end_date=None, state=None): 

TI = TaskInstance 

if not start_date: 

start_date = (datetime.today()-timedelta(30)).date() 

start_date = datetime.combine(start_date, datetime.min.time()) 

end_date = end_date or datetime.now() 

tis = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.execution_date >= start_date, 

TI.execution_date <= end_date, 

TI.task_id.in_([t.task_id for t in self.tasks]), 

) 

if state: 

tis = tis.filter(TI.state == state) 

tis = tis.all() 

return tis 

 

@property 

def roots(self): 

return [t for t in self.tasks if not t.downstream_list] 

 

@provide_session 

def set_dag_runs_state( 

self, start_date, end_date, state=State.RUNNING, session=None): 

dates = utils_date_range(start_date, end_date) 

drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all() 

for dr in drs: 

dr.state = State.RUNNING 

 

def clear( 

self, start_date=None, end_date=None, 

only_failed=False, 

only_running=False, 

confirm_prompt=False, 

include_subdags=True, 

reset_dag_runs=True, 

dry_run=False): 

session = settings.Session() 

""" 

Clears a set of task instances associated with the current dag for 

a specified date range. 

""" 

TI = TaskInstance 

tis = session.query(TI) 

if include_subdags: 

# Crafting the right filter for dag_id and task_ids combo 

conditions = [] 

for dag in self.subdags + [self]: 

conditions.append( 

TI.dag_id.like(dag.dag_id) & TI.task_id.in_(dag.task_ids) 

) 

tis = tis.filter(or_(*conditions)) 

else: 

tis = session.query(TI).filter(TI.dag_id == self.dag_id) 

tis = tis.filter(TI.task_id.in_(self.task_ids)) 

 

if start_date: 

tis = tis.filter(TI.execution_date >= start_date) 

if end_date: 

tis = tis.filter(TI.execution_date <= end_date) 

if only_failed: 

tis = tis.filter(TI.state == State.FAILED) 

if only_running: 

tis = tis.filter(TI.state == State.RUNNING) 

 

if dry_run: 

tis = tis.all() 

session.expunge_all() 

return tis 

 

count = tis.count() 

do_it = True 

if count == 0: 

print("Nothing to clear.") 

return 0 

if confirm_prompt: 

ti_list = "\n".join([str(t) for t in tis]) 

question = ( 

"You are about to delete these {count} tasks:\n" 

"{ti_list}\n\n" 

"Are you sure? (yes/no): ").format(**locals()) 

do_it = utils.helpers.ask_yesno(question) 

 

if do_it: 

clear_task_instances(tis, session) 

if reset_dag_runs: 

self.set_dag_runs_state(start_date, end_date, session=session) 

else: 

count = 0 

print("Bail. Nothing was cleared.") 

 

session.commit() 

session.close() 

return count 

 

def __deepcopy__(self, memo): 

# Swiwtcharoo to go around deepcopying objects coming through the 

# backdoor 

cls = self.__class__ 

result = cls.__new__(cls) 

memo[id(self)] = result 

for k, v in list(self.__dict__.items()): 

if k not in ('user_defined_macros', 'params'): 

setattr(result, k, copy.deepcopy(v, memo)) 

 

result.user_defined_macros = self.user_defined_macros 

result.params = self.params 

return result 

 

def sub_dag(self, task_regex, include_downstream=False, 

include_upstream=True): 

""" 

Returns a subset of the current dag as a deep copy of the current dag 

based on a regex that should match one or many tasks, and includes 

upstream and downstream neighbours based on the flag passed. 

""" 

 

dag = copy.deepcopy(self) 

 

regex_match = [ 

t for t in dag.tasks if re.findall(task_regex, t.task_id)] 

also_include = [] 

for t in regex_match: 

if include_downstream: 

also_include += t.get_flat_relatives(upstream=False) 

if include_upstream: 

also_include += t.get_flat_relatives(upstream=True) 

 

# Compiling the unique list of tasks that made the cut 

dag.task_dict = {t.task_id: t for t in regex_match + also_include} 

for t in dag.tasks: 

# Removing upstream/downstream references to tasks that did not 

# made the cut 

t._upstream_task_ids = [ 

tid for tid in t._upstream_task_ids if tid in dag.task_ids] 

t._downstream_task_ids = [ 

tid for tid in t._downstream_task_ids if tid in dag.task_ids] 

return dag 

 

def has_task(self, task_id): 

return task_id in (t.task_id for t in self.tasks) 

 

def get_task(self, task_id): 

if task_id in self.task_dict: 

return self.task_dict[task_id] 

raise AirflowException("Task {task_id} not found".format(**locals())) 

 

@provide_session 

def pickle_info(self, session=None): 

d = {} 

d['is_picklable'] = True 

try: 

dttm = datetime.now() 

pickled = pickle.dumps(self) 

d['pickle_len'] = len(pickled) 

d['pickling_duration'] = "{}".format(datetime.now() - dttm) 

except Exception as e: 

logging.exception(e) 

d['is_picklable'] = False 

d['stacktrace'] = traceback.format_exc() 

return d 

 

@provide_session 

def pickle(self, session=None): 

dag = session.query( 

DagModel).filter(DagModel.dag_id == self.dag_id).first() 

dp = None 

if dag and dag.pickle_id: 

dp = session.query(DagPickle).filter( 

DagPickle.id == dag.pickle_id).first() 

if not dp or dp.pickle != self: 

dp = DagPickle(dag=self) 

session.add(dp) 

self.last_pickled = datetime.now() 

session.commit() 

self.pickle_id = dp.id 

 

return dp 

 

def tree_view(self): 

""" 

Shows an ascii tree representation of the DAG 

""" 

def get_downstream(task, level=0): 

print((" " * level * 4) + str(task)) 

level += 1 

for t in task.upstream_list: 

get_downstream(t, level) 

 

for t in self.roots: 

get_downstream(t) 

 

def add_task(self, task): 

''' 

Add a task to the DAG 

 

:param task: the task you want to add 

:type task: task 

''' 

if not self.start_date and not task.start_date: 

raise AirflowException("Task is missing the start_date parameter") 

if not task.start_date: 

task.start_date = self.start_date 

 

if task.task_id in self.task_dict: 

#TODO raise an error in Airflow 2.0 

warnings.warn( 

'The requested task could not be added to the DAG because a ' 

'task with task_id {} is already in the DAG. Starting in ' 

'Airflow 2.0, trying to overwrite a task will raise an ' 

'exception.'.format(task.task_id), 

category=PendingDeprecationWarning) 

else: 

self.tasks.append(task) 

self.task_dict[task.task_id] = task 

task.dag = self 

 

self.task_count = len(self.tasks) 

 

def add_tasks(self, tasks): 

''' 

Add a list of tasks to the DAG 

 

:param task: a lit of tasks you want to add 

:type task: list of tasks 

''' 

for task in tasks: 

self.add_task(task) 

 

def db_merge(self): 

BO = BaseOperator 

session = settings.Session() 

tasks = session.query(BO).filter(BO.dag_id == self.dag_id).all() 

for t in tasks: 

session.delete(t) 

session.commit() 

session.merge(self) 

session.commit() 

 

def run( 

self, 

start_date=None, 

end_date=None, 

mark_success=False, 

include_adhoc=False, 

local=False, 

executor=None, 

donot_pickle=configuration.getboolean('core', 'donot_pickle'), 

ignore_dependencies=False, 

ignore_first_depends_on_past=False, 

pool=None): 

""" 

Runs the DAG. 

""" 

from airflow.jobs import BackfillJob 

if not executor and local: 

executor = LocalExecutor() 

elif not executor: 

executor = DEFAULT_EXECUTOR 

job = BackfillJob( 

self, 

start_date=start_date, 

end_date=end_date, 

mark_success=mark_success, 

include_adhoc=include_adhoc, 

executor=executor, 

donot_pickle=donot_pickle, 

ignore_dependencies=ignore_dependencies, 

ignore_first_depends_on_past=ignore_first_depends_on_past, 

pool=pool) 

job.run() 

 

def cli(self): 

""" 

Exposes a CLI specific to this DAG 

""" 

from airflow.bin import cli 

parser = cli.CLIFactory.get_parser(dag_parser=True) 

args = parser.parse_args() 

args.func(args, self) 

 

@provide_session 

def create_dagrun(self, 

run_id, 

execution_date, 

state, 

start_date=None, 

external_trigger=False, 

conf=None, 

session=None): 

""" 

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag 

run. 

:param run_id: defines the the run id for this dag run 

:type run_id: string 

:param execution_date: the execution date of this dag run 

:type execution_date: datetime 

:param state: the state of the dag run 

:type state: State 

:param start_date: the date this dag run should be evaluated 

:type state_date: datetime 

:param external_trigger: whether this dag run is externally triggered 

:type external_trigger: bool 

:param session: database session 

:type session: Session 

""" 

run = DagRun( 

dag_id=self.dag_id, 

run_id=run_id, 

execution_date=execution_date, 

start_date=start_date, 

external_trigger=external_trigger, 

conf=conf, 

state=state 

) 

session.add(run) 

session.commit() 

 

run.dag = self 

 

# create the associated taskinstances 

# state is None at the moment of creation 

run.verify_integrity(session=session) 

 

run.refresh_from_db() 

return run 

 

 

class Chart(Base): 

__tablename__ = "chart" 

 

id = Column(Integer, primary_key=True) 

label = Column(String(200)) 

conn_id = Column(String(ID_LEN), nullable=False) 

user_id = Column(Integer(), ForeignKey('users.id'), nullable=True) 

chart_type = Column(String(100), default="line") 

sql_layout = Column(String(50), default="series") 

sql = Column(Text, default="SELECT series, x, y FROM table") 

y_log_scale = Column(Boolean) 

show_datatable = Column(Boolean) 

show_sql = Column(Boolean, default=True) 

height = Column(Integer, default=600) 

default_params = Column(String(5000), default="{}") 

owner = relationship( 

"User", cascade=False, cascade_backrefs=False, backref='charts') 

x_is_date = Column(Boolean, default=True) 

iteration_no = Column(Integer, default=0) 

last_modified = Column(DateTime, default=func.now()) 

 

def __repr__(self): 

return self.label 

 

 

class KnownEventType(Base): 

__tablename__ = "known_event_type" 

 

id = Column(Integer, primary_key=True) 

know_event_type = Column(String(200)) 

 

def __repr__(self): 

return self.know_event_type 

 

 

class KnownEvent(Base): 

__tablename__ = "known_event" 

 

id = Column(Integer, primary_key=True) 

label = Column(String(200)) 

start_date = Column(DateTime) 

end_date = Column(DateTime) 

user_id = Column(Integer(), ForeignKey('users.id'),) 

known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),) 

reported_by = relationship( 

"User", cascade=False, cascade_backrefs=False, backref='known_events') 

event_type = relationship( 

"KnownEventType", 

cascade=False, 

cascade_backrefs=False, backref='known_events') 

description = Column(Text) 

 

def __repr__(self): 

return self.label 

 

 

class Variable(Base): 

__tablename__ = "variable" 

 

id = Column(Integer, primary_key=True) 

key = Column(String(ID_LEN), unique=True) 

_val = Column('val', Text) 

is_encrypted = Column(Boolean, unique=False, default=False) 

 

def __repr__(self): 

# Hiding the value 

return '{} : {}'.format(self.key, self._val) 

 

def get_val(self): 

if self._val and self.is_encrypted: 

if not ENCRYPTION_ON: 

raise AirflowException( 

"Can't decrypt _val, configuration is missing") 

return FERNET.decrypt(bytes(self._val, 'utf-8')).decode() 

else: 

return self._val 

 

def set_val(self, value): 

if value: 

try: 

self._val = FERNET.encrypt(bytes(value, 'utf-8')).decode() 

self.is_encrypted = True 

except NameError: 

self._val = value 

self.is_encrypted = False 

 

@declared_attr 

def val(cls): 

return synonym('_val', 

descriptor=property(cls.get_val, cls.set_val)) 

@classmethod 

@provide_session 

def get(cls, key, default_var=None, deserialize_json=False, session=None): 

obj = session.query(cls).filter(cls.key == key).first() 

if obj is None: 

if default_var is not None: 

return default_var 

else: 

raise ValueError('Variable {} does not exist'.format(key)) 

else: 

if deserialize_json: 

return json.loads(obj.val) 

else: 

return obj.val 

 

@classmethod 

@provide_session 

def set(cls, key, value, serialize_json=False, session=None): 

 

if serialize_json: 

stored_value = json.dumps(value) 

else: 

stored_value = value 

 

session.query(cls).filter(cls.key == key).delete() 

session.add(Variable(key=key, val=stored_value)) 

session.flush() 

 

 

class XCom(Base): 

""" 

Base class for XCom objects. 

""" 

__tablename__ = "xcom" 

 

id = Column(Integer, primary_key=True) 

key = Column(String(512)) 

value = Column(PickleType(pickler=dill)) 

timestamp = Column( 

DateTime, default=func.now(), nullable=False) 

execution_date = Column(DateTime, nullable=False) 

 

# source information 

task_id = Column(String(ID_LEN), nullable=False) 

dag_id = Column(String(ID_LEN), nullable=False) 

 

def __repr__(self): 

return '<XCom "{key}" ({task_id} @ {execution_date})>'.format( 

key=self.key, 

task_id=self.task_id, 

execution_date=self.execution_date) 

 

@classmethod 

@provide_session 

def set( 

cls, 

key, 

value, 

execution_date, 

task_id, 

dag_id, 

session=None): 

""" 

Store an XCom value. 

""" 

session.expunge_all() 

 

# remove any duplicate XComs 

session.query(cls).filter( 

cls.key == key, 

cls.execution_date == execution_date, 

cls.task_id == task_id, 

cls.dag_id == dag_id).delete() 

 

# insert new XCom 

session.add(XCom( 

key=key, 

value=value, 

execution_date=execution_date, 

task_id=task_id, 

dag_id=dag_id)) 

 

session.commit() 

 

@classmethod 

@provide_session 

def get_one( 

cls, 

execution_date, 

key=None, 

task_id=None, 

dag_id=None, 

include_prior_dates=False, 

session=None): 

""" 

Retrieve an XCom value, optionally meeting certain criteria 

""" 

filters = [] 

if key: 

filters.append(cls.key == key) 

if task_id: 

filters.append(cls.task_id == task_id) 

if dag_id: 

filters.append(cls.dag_id == dag_id) 

if include_prior_dates: 

filters.append(cls.execution_date <= execution_date) 

else: 

filters.append(cls.execution_date == execution_date) 

 

query = ( 

session.query(cls.value) 

.filter(and_(*filters)) 

.order_by(cls.execution_date.desc(), cls.timestamp.desc()) 

.limit(1)) 

 

result = query.first() 

if result: 

return result.value 

 

@classmethod 

@provide_session 

def get_many( 

cls, 

execution_date, 

key=None, 

task_ids=None, 

dag_ids=None, 

include_prior_dates=False, 

limit=100, 

session=None): 

""" 

Retrieve an XCom value, optionally meeting certain criteria 

""" 

filters = [] 

if key: 

filters.append(cls.key == key) 

if task_ids: 

filters.append(cls.task_id.in_(as_tuple(task_ids))) 

if dag_ids: 

filters.append(cls.dag_id.in_(as_tuple(dag_ids))) 

if include_prior_dates: 

filters.append(cls.execution_date <= execution_date) 

else: 

filters.append(cls.execution_date == execution_date) 

 

query = ( 

session.query(cls) 

.filter(and_(*filters)) 

.order_by(cls.execution_date.desc(), cls.timestamp.desc()) 

.limit(limit)) 

 

return query.all() 

 

@classmethod 

@provide_session 

def delete(cls, xcoms, session=None): 

if isinstance(xcoms, XCom): 

xcoms = [xcoms] 

for xcom in xcoms: 

if not isinstance(xcom, XCom): 

raise TypeError('Expected XCom; received {}'.format( 

xcom.__class__.__name__)) 

session.delete(xcom) 

session.commit() 

 

 

class DagRun(Base): 

""" 

DagRun describes an instance of a Dag. It can be created 

by the scheduler (for regular runs) or by an external trigger 

""" 

__tablename__ = "dag_run" 

 

ID_PREFIX = 'scheduled__' 

ID_FORMAT_PREFIX = ID_PREFIX + '{0}' 

 

id = Column(Integer, primary_key=True) 

dag_id = Column(String(ID_LEN)) 

execution_date = Column(DateTime, default=func.now()) 

start_date = Column(DateTime, default=func.now()) 

end_date = Column(DateTime) 

state = Column(String(50), default=State.RUNNING) 

run_id = Column(String(ID_LEN)) 

external_trigger = Column(Boolean, default=True) 

conf = Column(PickleType) 

 

dag = None 

 

__table_args__ = ( 

Index('dr_run_id', dag_id, run_id, unique=True), 

) 

 

def __repr__(self): 

return ( 

'<DagRun {dag_id} @ {execution_date}: {run_id}, ' 

'externally triggered: {external_trigger}>' 

).format( 

dag_id=self.dag_id, 

execution_date=self.execution_date, 

run_id=self.run_id, 

external_trigger=self.external_trigger) 

 

@classmethod 

def id_for_date(klass, date, prefix=ID_FORMAT_PREFIX): 

return prefix.format(date.isoformat()[:19]) 

 

@provide_session 

def refresh_from_db(self, session=None): 

""" 

Reloads the current dagrun from the database 

:param session: database session 

""" 

DR = DagRun 

 

dr = session.query(DR).filter( 

DR.dag_id == self.dag_id, 

DR.execution_date == self.execution_date, 

DR.run_id == self.run_id 

).one() 

if dr: 

self.id = dr.id 

self.state = dr.state 

 

@staticmethod 

@provide_session 

def find(dag_id=None, run_id=None, execution_date=None, 

state=None, external_trigger=None, session=None): 

""" 

Returns a set of dag runs for the given search criteria. 

:param dag_id: the dag_id to find dag runs for 

:type dag_id: integer, list 

:param run_id: defines the the run id for this dag run 

:type run_id: string 

:param execution_date: the execution date 

:type execution_date: datetime 

:param state: the state of the dag run 

:type state: State 

:param external_trigger: whether this dag run is externally triggered 

:type external_trigger: bool 

:param session: database session 

:type session: Session 

""" 

DR = DagRun 

 

qry = session.query(DR) 

if dag_id: 

qry = qry.filter(DR.dag_id == dag_id) 

if run_id: 

qry = qry.filter(DR.run_id == run_id) 

if execution_date: 

qry = qry.filter(DR.execution_date == execution_date) 

if state: 

qry = qry.filter(DR.state == state) 

if external_trigger: 

qry = qry.filter(DR.external_trigger == external_trigger) 

 

dr = qry.order_by(DR.execution_date).all() 

 

return dr 

 

@provide_session 

def get_task_instances(self, state=None, session=None): 

""" 

Returns the task instances for this dag run 

""" 

TI = TaskInstance 

tis = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.execution_date == self.execution_date, 

) 

if state: 

if isinstance(state, six.string_types): 

tis = tis.filter(TI.state == state) 

else: 

# this is required to deal with NULL values 

if None in state: 

tis = tis.filter(or_( 

TI.state.in_(state), 

TI.state.is_(None)) 

) 

else: 

tis = tis.filter(TI.state.in_(state)) 

 

return tis.all() 

 

@provide_session 

def get_task_instance(self, task_id, session=None): 

""" 

Returns the task instance specified by task_id for this dag run 

:param task_id: the task id 

""" 

 

TI = TaskInstance 

ti = session.query(TI).filter( 

TI.dag_id == self.dag_id, 

TI.execution_date == self.execution_date, 

TI.task_id == task_id 

).one() 

 

return ti 

 

def get_dag(self): 

""" 

Returns the Dag associated with this DagRun 

:param session: database session 

:return: DAG 

""" 

if not self.dag: 

raise AirflowException("The DAG (.dag) for {} needs to be set" 

.format(self)) 

 

return self.dag 

 

@provide_session 

def update_state(self, session=None): 

""" 

Determines the overall state of the DagRun based on the state 

of its TaskInstances. 

:returns State: 

""" 

 

dag = self.get_dag() 

tis = self.get_task_instances(session=session) 

 

logging.info("Updating state for {} considering {} task(s)" 

.format(self, len(tis))) 

 

for ti in tis: 

# skip in db? 

if ti.state == State.REMOVED: 

tis.remove(ti) 

else: 

ti.task = dag.get_task(ti.task_id) 

 

# pre-calculate 

# db is faster 

start_dttm = datetime.now() 

unfinished_tasks = self.get_task_instances( 

state=State.unfinished(), 

session=session 

) 

none_depends_on_past = all(t.task.depends_on_past for t in unfinished_tasks) 

 

# small speed up 

if unfinished_tasks and none_depends_on_past: 

# todo: this can actually get pretty slow: one task costs between 0.01-015s 

no_dependencies_met = all(not t.are_dependencies_met(session=session) 

for t in unfinished_tasks) 

 

duration = (datetime.now() - start_dttm).total_seconds() * 1000 

Stats.timing("dagrun.dependency-check.{}.{}". 

format(self.dag_id, self.execution_date), duration) 

 

# future: remove the check on adhoc tasks (=active_tasks) 

if len(tis) == len(dag.active_tasks): 

# if any roots failed, the run failed 

root_ids = [t.task_id for t in dag.roots] 

roots = [t for t in tis if t.task_id in root_ids] 

 

if any(r.state in (State.FAILED, State.UPSTREAM_FAILED) 

for r in roots): 

logging.info('Marking run {} failed'.format(self)) 

self.state = State.FAILED 

 

# if all roots succeeded, the run succeeded 

elif all(r.state in (State.SUCCESS, State.SKIPPED) 

for r in roots): 

logging.info('Marking run {} successful'.format(self)) 

self.state = State.SUCCESS 

 

# if *all tasks* are deadlocked, the run failed 

elif unfinished_tasks and none_depends_on_past and no_dependencies_met: 

logging.info( 

'Deadlock; marking run {} failed'.format(self)) 

self.state = State.FAILED 

 

# finally, if the roots aren't done, the dag is still running 

else: 

self.state = State.RUNNING 

 

# todo: determine we want to use with_for_update to make sure to lock the run 

session.merge(self) 

session.commit() 

 

return self.state 

 

@provide_session 

def verify_integrity(self, session=None): 

""" 

Verifies the DagRun by checking for removed tasks or tasks that are not in the 

database yet. It will set state to removed or add the task if required. 

""" 

dag = self.get_dag() 

tis = self.get_task_instances(session=session) 

 

# check for removed tasks 

task_ids = [] 

for ti in tis: 

task_ids.append(ti.task_id) 

try: 

dag.get_task(ti.task_id) 

except AirflowException: 

if self.state is not State.RUNNING: 

ti.state = State.REMOVED 

 

# check for missing tasks 

for task in dag.tasks: 

if task.adhoc: 

continue 

 

if task.task_id not in task_ids: 

ti = TaskInstance(task, self.execution_date) 

session.add(ti) 

 

session.commit() 

 

 

class Pool(Base): 

__tablename__ = "slot_pool" 

 

id = Column(Integer, primary_key=True) 

pool = Column(String(50), unique=True) 

slots = Column(Integer, default=0) 

description = Column(Text) 

 

def __repr__(self): 

return self.pool 

 

@provide_session 

def used_slots(self, session): 

""" 

Returns the number of slots used at the moment 

""" 

running = ( 

session 

.query(TaskInstance) 

.filter(TaskInstance.pool == self.pool) 

.filter(TaskInstance.state == State.RUNNING) 

.count() 

) 

return running 

 

@provide_session 

def queued_slots(self, session): 

""" 

Returns the number of slots used at the moment 

""" 

return ( 

session 

.query(TaskInstance) 

.filter(TaskInstance.pool == self.pool) 

.filter(TaskInstance.state == State.QUEUED) 

.count() 

) 

 

@provide_session 

def open_slots(self, session): 

""" 

Returns the number of slots open at the moment 

""" 

used_slots = self.used_slots(session=session) 

return self.slots - used_slots 

 

 

class SlaMiss(Base): 

""" 

Model that stores a history of the SLA that have been missed. 

It is used to keep track of SLA failures over time and to avoid double 

triggering alert emails. 

""" 

__tablename__ = "sla_miss" 

 

task_id = Column(String(ID_LEN), primary_key=True) 

dag_id = Column(String(ID_LEN), primary_key=True) 

execution_date = Column(DateTime, primary_key=True) 

email_sent = Column(Boolean, default=False) 

timestamp = Column(DateTime) 

description = Column(Text) 

notification_sent = Column(Boolean, default=False) 

 

def __repr__(self): 

return str(( 

self.dag_id, self.task_id, self.execution_date.isoformat())) 

 

 

class ImportError(Base): 

__tablename__ = "import_error" 

id = Column(Integer, primary_key=True) 

timestamp = Column(DateTime) 

filename = Column(String(1024)) 

stacktrace = Column(Text)